| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.client; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Strings; |
| import org.apache.oozie.BuildInfo; |
| import org.apache.oozie.cli.ValidationUtil; |
| import org.apache.oozie.client.rest.JsonTags; |
| import org.apache.oozie.client.rest.JsonToBean; |
| import org.apache.oozie.client.rest.RestConstants; |
| import org.apache.oozie.client.retry.ConnectionRetriableClient; |
| import org.json.simple.JSONArray; |
| import org.json.simple.JSONObject; |
| import org.json.simple.JSONValue; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| |
| import javax.xml.parsers.DocumentBuilderFactory; |
| import javax.xml.transform.Transformer; |
| import javax.xml.transform.TransformerFactory; |
| import javax.xml.transform.dom.DOMSource; |
| import javax.xml.transform.stream.StreamResult; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.io.Reader; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.net.URLEncoder; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| |
| |
| /** |
| * Client API to submit and manage Oozie workflow jobs against an Oozie instance. |
| * <p> |
| * This class is thread safe. |
| * <p> |
| * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods: |
| * <code>[NAME=VALUE][;NAME=VALUE]*</code>. |
| * <p> |
| * Valid filter names are: |
| * <ul> |
| * <li>name: the workflow application name from the workflow definition.</li> |
| * <li>user: the user that submitted the job.</li> |
| * <li>group: the group for the job.</li> |
| * <li>status: the status of the job.</li> |
| * </ul> |
| * <p> |
| * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same |
| * name. Multiple values must be specified as different name value pairs. |
| */ |
| public class OozieClient { |
| |
| public static final long WS_PROTOCOL_VERSION_0 = 0; |
| |
| public static final long WS_PROTOCOL_VERSION_1 = 1; |
| |
| public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version |
| |
| public static final String USER_NAME = "user.name"; |
| |
| @Deprecated |
| public static final String GROUP_NAME = "group.name"; |
| |
| public static final String JOB_ACL = "oozie.job.acl"; |
| |
| public static final String APP_PATH = "oozie.wf.application.path"; |
| |
| public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path"; |
| |
| public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path"; |
| |
| public static final String BUNDLE_ID = "oozie.bundle.id"; |
| |
| public static final String EXTERNAL_ID = "oozie.wf.external.id"; |
| |
| public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy"; |
| |
| public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; |
| |
| public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url"; |
| |
| public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url"; |
| |
| public static final String COORD_ACTION_NOTIFICATION_PROXY = "oozie.coord.action.notification.proxy"; |
| |
| public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes"; |
| |
| public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes"; |
| |
| public static final String LOG_TOKEN = "oozie.wf.log.token"; |
| |
| public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries"; |
| |
| public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval"; |
| |
| public static final String FILTER_USER = "user"; |
| |
| public static final String FILTER_TEXT = "text"; |
| |
| public static final String FILTER_GROUP = "group"; |
| |
| public static final String FILTER_NAME = "name"; |
| |
| public static final String FILTER_STATUS = "status"; |
| |
| public static final String FILTER_NOMINAL_TIME = "nominaltime"; |
| |
| public static final String FILTER_FREQUENCY = "frequency"; |
| |
| public static final String FILTER_ID = "id"; |
| |
| public static final String FILTER_UNIT = "unit"; |
| |
| public static final String FILTER_JOBID = "jobid"; |
| |
| public static final String FILTER_APPNAME = "appname"; |
| |
| public static final String FILTER_CREATED_TIME_START = "startcreatedtime"; |
| |
| public static final String FILTER_CREATED_TIME_END = "endcreatedtime"; |
| |
| public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts"; |
| |
| public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts"; |
| |
| public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than"; |
| |
| public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord"; |
| |
| public static final String CHANGE_VALUE_ENDTIME = "endtime"; |
| |
| public static final String CHANGE_VALUE_PAUSETIME = "pausetime"; |
| |
| public static final String CHANGE_VALUE_CONCURRENCY = "concurrency"; |
| |
| public static final String CHANGE_VALUE_STATUS = "status"; |
| |
| public static final String LIBPATH = "oozie.libpath"; |
| |
| public static final String CONFIG_PATH = "oozie.default.configuration.path"; |
| |
| public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath"; |
| |
| public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes"; |
| |
| public static final String FILTER_SORT_BY = "sortby"; |
| |
| public static final String CONFIG_KEY_GENERATED_XML = "oozie.jobs.api.generated.xml"; |
| |
| public enum SORT_BY { |
| createdTime("createdTimestamp"), lastModifiedTime("lastModifiedTimestamp"); |
| private final String fullname; |
| |
| SORT_BY(String fullname) { |
| this.fullname = fullname; |
| } |
| |
| public String getFullname() { |
| return fullname; |
| } |
| } |
| |
| public enum SYSTEM_MODE { |
| NORMAL, NOWEBSERVICE, SAFEMODE |
| } |
| |
| private static final Set<String> COMPLETED_WF_STATUSES = new HashSet<>(); |
| private static final Set<String> COMPLETED_COORD_AND_BUNDLE_STATUSES = new HashSet<>(); |
| private static final Set<String> COMPLETED_COORD_ACTION_STATUSES = new HashSet<>(); |
| static { |
| COMPLETED_WF_STATUSES.add(WorkflowJob.Status.FAILED.toString()); |
| COMPLETED_WF_STATUSES.add(WorkflowJob.Status.KILLED.toString()); |
| COMPLETED_WF_STATUSES.add(WorkflowJob.Status.SUCCEEDED.toString()); |
| COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.FAILED.toString()); |
| COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.KILLED.toString()); |
| COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.SUCCEEDED.toString()); |
| COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.DONEWITHERROR.toString()); |
| COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.IGNORED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.FAILED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.IGNORED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.KILLED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SKIPPED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SUCCEEDED.toString()); |
| COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.TIMEDOUT.toString()); |
| } |
| |
| /** |
| * debugMode =0 means no debugging. 1 means debugging on. |
| */ |
| public int debugMode = 0; |
| |
| private int retryCount = 4; |
| |
| private String baseUrl; |
| private String protocolUrl; |
| private boolean validatedVersion = false; |
| private JSONArray supportedVersions; |
| private final Map<String, String> headers = new HashMap<>(); |
| |
| private static final ThreadLocal<String> USER_NAME_TL = new ThreadLocal<>(); |
| |
| /** |
| * Allows to impersonate other users in the Oozie server. The current user |
| * must be configured as a proxyuser in Oozie. |
| * <p> |
| * IMPORTANT: impersonation happens only with Oozie client requests done within |
| * doAs() calls. |
| * |
| * @param <T> the type of the callable |
| * @param userName user to impersonate. |
| * @param callable callable with {@link OozieClient} calls impersonating the specified user. |
| * @return any response returned by the {@link Callable#call()} method. |
| * @throws Exception thrown by the {@link Callable#call()} method. |
| */ |
| public static <T> T doAs(String userName, Callable<T> callable) throws Exception { |
| notEmpty(userName, "userName"); |
| notNull(callable, "callable"); |
| try { |
| USER_NAME_TL.set(userName); |
| return callable.call(); |
| } |
| finally { |
| USER_NAME_TL.remove(); |
| } |
| } |
| |
| protected OozieClient() { |
| } |
| |
| /** |
| * Create a Workflow client instance. |
| * |
| * @param oozieUrl URL of the Oozie instance it will interact with. |
| */ |
| public OozieClient(String oozieUrl) { |
| this.baseUrl = notEmpty(oozieUrl, "oozieUrl"); |
| if (!this.baseUrl.endsWith("/")) { |
| this.baseUrl += "/"; |
| } |
| } |
| |
| /** |
| * Return the Oozie URL of the workflow client instance. |
| * <p> |
| * This URL is the base URL fo the Oozie system, with not protocol versioning. |
| * |
| * @return the Oozie URL of the workflow client instance. |
| */ |
| public String getOozieUrl() { |
| return baseUrl; |
| } |
| |
| /** |
| * Return the Oozie URL used by the client and server for WS communications. |
| * <p> |
| * This URL is the original URL plus the versioning element path. |
| * |
| * @return the Oozie URL used by the client and server for communication. |
| * @throws OozieClientException thrown in the client and the server are not protocol compatible. |
| */ |
| public String getProtocolUrl() throws OozieClientException { |
| validateWSVersion(); |
| return protocolUrl; |
| } |
| |
| /** |
| * @return current debug Mode |
| */ |
| public int getDebugMode() { |
| return debugMode; |
| } |
| |
| /** |
| * Set debug mode. |
| * |
| * @param debugMode : 0 means no debugging. 1 means debugging |
| */ |
| public void setDebugMode(int debugMode) { |
| this.debugMode = debugMode; |
| } |
| |
| public int getRetryCount() { |
| return retryCount; |
| } |
| |
| |
| public void setRetryCount(int retryCount) { |
| this.retryCount = retryCount; |
| } |
| |
| private String getBaseURLForVersion(long protocolVersion) throws OozieClientException { |
| try { |
| if (supportedVersions == null) { |
| supportedVersions = getSupportedProtocolVersions(); |
| } |
| if (supportedVersions == null) { |
| throw new OozieClientException("HTTP error", "no response message"); |
| } |
| if (supportedVersions.contains(protocolVersion)) { |
| return baseUrl + "v" + protocolVersion + "/"; |
| } |
| else { |
| throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version " |
| + protocolVersion + " is not supported"); |
| } |
| } |
| catch (IOException e) { |
| throw new OozieClientException(OozieClientException.IO_ERROR, e); |
| } |
| } |
| |
| /** |
| * Validate that the Oozie client and server instances are protocol compatible. |
| * |
| * @throws OozieClientException thrown in the client and the server are not protocol compatible. |
| */ |
| public synchronized void validateWSVersion() throws OozieClientException { |
| if (!validatedVersion) { |
| try { |
| supportedVersions = getSupportedProtocolVersions(); |
| if (supportedVersions == null) { |
| throw new OozieClientException("HTTP error", "no response message"); |
| } |
| if (!supportedVersions.contains(WS_PROTOCOL_VERSION) |
| && !supportedVersions.contains(WS_PROTOCOL_VERSION_1) |
| && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { |
| StringBuilder msg = new StringBuilder(); |
| msg.append("Supported version [").append(WS_PROTOCOL_VERSION) |
| .append("] or less, Unsupported versions["); |
| String separator = ""; |
| for (Object version : supportedVersions) { |
| msg.append(separator).append(version); |
| } |
| msg.append("]"); |
| throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString()); |
| } |
| if (supportedVersions.contains(WS_PROTOCOL_VERSION)) { |
| protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/"; |
| } |
| else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) { |
| protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/"; |
| } |
| else { |
| if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { |
| protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/"; |
| } |
| } |
| } |
| catch (IOException ex) { |
| throw new OozieClientException(OozieClientException.IO_ERROR, ex); |
| } |
| validatedVersion = true; |
| } |
| } |
| |
| private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException { |
| JSONArray versions = null; |
| final URL url = new URL(baseUrl + RestConstants.VERSIONS); |
| |
| HttpURLConnection conn = createRetryableConnection(url, "GET"); |
| |
| if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { |
| versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); |
| } |
| else { |
| handleError(conn); |
| } |
| return versions; |
| } |
| |
| /** |
| * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name. |
| * |
| * @return an empty configuration. |
| */ |
| public Properties createConfiguration() { |
| Properties conf = new Properties(); |
| conf.setProperty(USER_NAME, getUserName()); |
| return conf; |
| } |
| |
| /** |
| * Set a HTTP header to be used in the WS requests by the workflow instance. |
| * |
| * @param name header name. |
| * @param value header value. |
| */ |
| public void setHeader(String name, String value) { |
| headers.put(notEmpty(name, "name"), notNull(value, "value")); |
| } |
| |
| /** |
| * Get the value of a set HTTP header from the workflow instance. |
| * |
| * @param name header name. |
| * @return header value, <code>null</code> if not set. |
| */ |
| public String getHeader(String name) { |
| return headers.get(notEmpty(name, "name")); |
| } |
| |
| /** |
| * Get the set HTTP header |
| * |
| * @return map of header key and value |
| */ |
| public Map<String, String> getHeaders() { |
| return headers; |
| } |
| |
| /** |
| * Remove a HTTP header from the workflow client instance. |
| * |
| * @param name header name. |
| */ |
| public void removeHeader(String name) { |
| headers.remove(notEmpty(name, "name")); |
| } |
| |
| /** |
| * Return an iterator with all the header names set in the workflow instance. |
| * |
| * @return header names. |
| */ |
| public Iterator<String> getHeaderNames() { |
| return Collections.unmodifiableMap(headers).keySet().iterator(); |
| } |
| |
| private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters) |
| throws IOException, OozieClientException { |
| validateWSVersion(); |
| StringBuilder sb = new StringBuilder(); |
| if (protocolVersion == null) { |
| sb.append(protocolUrl); |
| } |
| else { |
| sb.append(getBaseURLForVersion(protocolVersion)); |
| } |
| sb.append(collection); |
| if (resource != null && resource.length() > 0) { |
| sb.append("/").append(resource); |
| } |
| if (parameters.size() > 0) { |
| String separator = "?"; |
| for (Map.Entry<String, String> param : parameters.entrySet()) { |
| if (param.getValue() != null) { |
| sb.append(separator).append(URLEncoder.encode(param.getKey(), StandardCharsets.UTF_8.name())).append("=") |
| .append(URLEncoder.encode(param.getValue(), StandardCharsets.UTF_8.name())); |
| separator = "&"; |
| } |
| } |
| } |
| return new URL(sb.toString()); |
| } |
| |
| private boolean validateCommand(String url) { |
| { |
| if (protocolUrl.contains(baseUrl + "v0")) { |
| if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| /** |
| * Create retryable http connection to oozie server. |
| * |
| * @param url URL to create connection to |
| * @param method method to use - GET, POST, PUT |
| * @return connection |
| * @throws IOException in case of an exception during connection setup |
| */ |
| protected HttpURLConnection createRetryableConnection(final URL url, final String method) throws IOException { |
| return (HttpURLConnection) new ConnectionRetriableClient(getRetryCount()) { |
| @Override |
| public Object doExecute(URL url, String method) throws IOException, OozieClientException { |
| return createConnection(url, method); |
| } |
| }.execute(url, method); |
| } |
| |
| /** |
| * Create http connection to oozie server. |
| * |
| * @param url URL to create connection to |
| * @param method method to use - GET, POST, PUT |
| * @return connection |
| * @throws IOException in case of an exception during connection setup |
| * @throws OozieClientException if the connection can't be set up |
| */ |
| protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException { |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestMethod(method); |
| if (method.equals("POST") || method.equals("PUT")) { |
| conn.setDoOutput(true); |
| } |
| for (Map.Entry<String, String> header : headers.entrySet()) { |
| conn.setRequestProperty(header.getKey(), header.getValue()); |
| } |
| return conn; |
| } |
| |
| protected abstract class ClientCallable<T> implements Callable<T> { |
| private final String method; |
| private final String collection; |
| private final String resource; |
| private final Map<String, String> params; |
| private final Long protocolVersion; |
| |
| public ClientCallable(String method, String collection, String resource, Map<String, String> params) { |
| this(method, null, collection, resource, params); |
| } |
| |
| public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) { |
| this.method = method; |
| this.protocolVersion = protocolVersion; |
| this.collection = collection; |
| this.resource = resource; |
| this.params = params; |
| } |
| |
| public T call() throws OozieClientException { |
| try { |
| URL url = createURL(protocolVersion, collection, resource, params); |
| if (validateCommand(url.toString())) { |
| if (getDebugMode() > 0) { |
| System.out.println(method + " " + url); |
| } |
| return call(createRetryableConnection(url, method)); |
| } |
| else { |
| System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater." |
| + " Use 'oozie help' for details"); |
| throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception()); |
| } |
| } |
| catch (IOException ex) { |
| throw new OozieClientException(OozieClientException.IO_ERROR, ex); |
| } |
| } |
| |
| protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException; |
| |
| } |
| |
| protected abstract class MapClientCallable extends ClientCallable<Map<String, String>> { |
| |
| MapClientCallable(String method, String collection, String resource, Map<String, String> params) { |
| super(method, collection, resource, params); |
| } |
| |
| @Override |
| protected Map<String, String> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| Map<String, String> map = new HashMap<>(); |
| for (Object key : json.keySet()) { |
| map.put((String)key, (String)json.get(key)); |
| } |
| return map; |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| static void handleError(HttpURLConnection conn) throws IOException, OozieClientException { |
| int status = conn.getResponseCode(); |
| String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE); |
| String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE); |
| |
| if (error == null) { |
| error = "HTTP error code: " + status; |
| } |
| |
| if (message == null) { |
| message = conn.getResponseMessage(); |
| } |
| throw new OozieClientException(error, message); |
| } |
| |
| static Map<String, String> prepareParams(String... params) { |
| Map<String, String> map = new LinkedHashMap<>(); |
| for (int i = 0; i < params.length; i = i + 2) { |
| map.put(params[i], params[i + 1]); |
| } |
| String doAsUserName = USER_NAME_TL.get(); |
| if (doAsUserName != null) { |
| map.put(RestConstants.DO_AS_PARAM, doAsUserName); |
| } |
| return map; |
| } |
| |
| public void writeToXml(Properties props, OutputStream out) throws IOException { |
| try { |
| Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument(); |
| Element conf = doc.createElement("configuration"); |
| doc.appendChild(conf); |
| conf.appendChild(doc.createTextNode("\n")); |
| for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted. |
| String value = props.getProperty(name); |
| Element propNode = doc.createElement("property"); |
| conf.appendChild(propNode); |
| |
| Element nameNode = doc.createElement("name"); |
| nameNode.appendChild(doc.createTextNode(name.trim())); |
| propNode.appendChild(nameNode); |
| |
| Element valueNode = doc.createElement("value"); |
| valueNode.appendChild(doc.createTextNode(value.trim())); |
| propNode.appendChild(valueNode); |
| |
| conf.appendChild(doc.createTextNode("\n")); |
| } |
| |
| DOMSource source = new DOMSource(doc); |
| StreamResult result = new StreamResult(out); |
| TransformerFactory transFactory = TransformerFactory.newInstance(); |
| transFactory.setFeature("http://javax.xml.XMLConstants/feature/secure-processing", true); |
| Transformer transformer = transFactory.newTransformer(); |
| transformer.transform(source, result); |
| if (getDebugMode() > 0) { |
| result = new StreamResult(System.out); |
| transformer.transform(source, result); |
| System.out.println(); |
| } |
| } |
| catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private class JobSubmit extends ClientCallable<String> { |
| private final Properties conf; |
| private final String generatedXml; |
| |
| JobSubmit(final Properties conf, final boolean start, final String generatedXml) { |
| super("POST", RestConstants.JOBS, "", |
| (start) |
| ? prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_START, |
| RestConstants.USER_PARAM, |
| getUserName()) |
| : prepareParams(RestConstants.USER_PARAM, |
| getUserName())); |
| this.conf = notNull(conf, "conf"); |
| this.generatedXml = generatedXml; |
| } |
| |
| JobSubmit(String jobId, Properties conf) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_RERUN, RestConstants.USER_PARAM, getUserName())); |
| this.conf = notNull(conf, "conf"); |
| this.generatedXml = null; |
| } |
| |
| public JobSubmit(Properties conf, String jobActionDryrun) { |
| super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_DRYRUN, RestConstants.USER_PARAM, getUserName())); |
| this.conf = notNull(conf, "conf"); |
| this.generatedXml = null; |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| |
| if (!Strings.isNullOrEmpty(generatedXml)) { |
| conf.setProperty(CONFIG_KEY_GENERATED_XML, generatedXml); |
| } |
| |
| writeToXml(conf, conn.getOutputStream()); |
| |
| if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { |
| JSONObject json = (JSONObject) JSONValue.parse( |
| new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); |
| return (String) json.get(JsonTags.JOB_ID); |
| } |
| |
| if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { |
| handleError(conn); |
| } |
| |
| return null; |
| } |
| } |
| |
| /** |
| * Submit a workflow job. |
| * |
| * @param conf job configuration. |
| * @return the job Id. |
| * @throws OozieClientException thrown if the job could not be submitted. |
| */ |
| public String submit(Properties conf) throws OozieClientException { |
| return (new JobSubmit(conf, false, null)).call(); |
| } |
| |
| public String submit(final Properties conf, final String generatedXml) throws OozieClientException { |
| return (new JobSubmit(conf, false, generatedXml)).call(); |
| } |
| |
| private class JobAction extends ClientCallable<Void> { |
| |
| JobAction(String jobId, String action) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action)); |
| } |
| |
| JobAction(String jobId, String action, String params) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action, |
| RestConstants.JOB_CHANGE_VALUE, params)); |
| } |
| |
| @Override |
| protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class JobsAction extends ClientCallable<JSONObject> { |
| |
| JobsAction(String action, String filter, String jobType, int start, int len) { |
| super("PUT", RestConstants.JOBS, "", |
| prepareParams(RestConstants.ACTION_PARAM, action, |
| RestConstants.JOB_FILTER_PARAM, filter, RestConstants.JOBTYPE_PARAM, jobType, |
| RestConstants.OFFSET_PARAM, Integer.toString(start), |
| RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected JSONObject call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| return (JSONObject) JSONValue.parse(reader); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| /** |
| * Update coord definition. |
| * |
| * @param jobId the job id |
| * @param conf the conf |
| * @param dryrun the dryrun |
| * @param showDiff the show diff |
| * @return the string |
| * @throws OozieClientException the oozie client exception |
| */ |
| public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff) |
| throws OozieClientException { |
| return (new UpdateCoord(jobId, conf, dryrun, showDiff)).call(); |
| } |
| |
| /** |
| * Update coord definition without properties. |
| * |
| * @param jobId the job id |
| * @param dryrun the dryrun |
| * @param showDiff the show diff |
| * @return the string |
| * @throws OozieClientException the oozie client exception |
| */ |
| public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException { |
| return (new UpdateCoord(jobId, dryrun, showDiff)).call(); |
| } |
| |
| /** |
| * The Class UpdateCoord. |
| */ |
| private class UpdateCoord extends ClientCallable<String> { |
| private final Properties conf; |
| |
| public UpdateCoord(String jobId, Properties conf, String jobActionDryrun, String showDiff) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_COORD_UPDATE, RestConstants.JOB_ACTION_DRYRUN, jobActionDryrun, |
| RestConstants.JOB_ACTION_SHOWDIFF, showDiff)); |
| this.conf = conf; |
| } |
| |
| public UpdateCoord(String jobId, String jobActionDryrun, String showDiff) { |
| this(jobId, new Properties(), jobActionDryrun, showDiff); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| writeToXml(conf, conn.getOutputStream()); |
| |
| if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { |
| JSONObject json = (JSONObject) JSONValue.parse( |
| new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); |
| JSONObject update = (JSONObject) json.get(JsonTags.COORD_UPDATE); |
| if (update != null) { |
| return (String) update.get(JsonTags.COORD_UPDATE_DIFF); |
| } |
| else { |
| return ""; |
| } |
| } |
| if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * dryrun for a given job |
| * |
| * @param conf Job configuration. |
| * @return new job. |
| * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be submitted. |
| */ |
| public String dryrun(Properties conf) throws OozieClientException { |
| return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call(); |
| } |
| |
| /** |
| * Start a workflow job. |
| * |
| * @param jobId job Id. |
| * @throws OozieClientException thrown if the job could not be started. |
| */ |
| public void start(String jobId) throws OozieClientException { |
| new JobAction(jobId, RestConstants.JOB_ACTION_START).call(); |
| } |
| |
| /** |
| * Submit and start a workflow job. |
| * |
| * @param conf job configuration. |
| * @return the job Id. |
| * @throws OozieClientException thrown if the job could not be submitted. |
| */ |
| public String run(Properties conf) throws OozieClientException { |
| return (new JobSubmit(conf, true, null)).call(); |
| } |
| |
| public String run(final Properties conf, final String generatedXml) throws OozieClientException { |
| return (new JobSubmit(conf, true, generatedXml)).call(); |
| } |
| |
| /** |
| * Rerun a workflow job. |
| * |
| * @param jobId job Id to rerun. |
| * @param conf configuration information for the rerun. |
| * @throws OozieClientException thrown if the job could not be started. |
| */ |
| public void reRun(String jobId, Properties conf) throws OozieClientException { |
| new JobSubmit(jobId, conf).call(); |
| } |
| |
| /** |
| * Suspend a workflow job. |
| * |
| * @param jobId job Id. |
| * @throws OozieClientException thrown if the job could not be suspended. |
| */ |
| public void suspend(String jobId) throws OozieClientException { |
| new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call(); |
| } |
| |
| /** |
| * Resume a workflow job. |
| * |
| * @param jobId job Id. |
| * @throws OozieClientException thrown if the job could not be resume. |
| */ |
| public void resume(String jobId) throws OozieClientException { |
| new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call(); |
| } |
| |
| /** |
| * Kill a workflow/coord/bundle job. |
| * |
| * @param jobId job Id. |
| * @throws OozieClientException thrown if the job could not be killed. |
| */ |
| public void kill(String jobId) throws OozieClientException { |
| new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call(); |
| } |
| |
| /** |
| * Kill coordinator actions |
| * @param jobId coordinator Job Id |
| * @param rangeType type 'date' if -date is used, 'action-num' if -action is used |
| * @param scope kill scope for date or action nums |
| * @return list of coordinator actions that underwent kill |
| * @throws OozieClientException thrown if some actions could not be killed. |
| */ |
| public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException { |
| return new CoordActionsKill(jobId, rangeType, scope).call(); |
| } |
| |
| public JSONObject bulkModifyJobs(String actionType, String filter, String jobType, int start, int len) |
| throws OozieClientException { |
| return new JobsAction(actionType, filter, jobType, start, len).call(); |
| } |
| |
| public JSONObject killJobs(String filter, String jobType, int start, int len) |
| throws OozieClientException { |
| return bulkModifyJobs("kill", filter, jobType, start, len); |
| } |
| |
| public JSONObject suspendJobs(String filter, String jobType, int start, int len) |
| throws OozieClientException { |
| return bulkModifyJobs("suspend", filter, jobType, start, len); |
| } |
| |
| public JSONObject resumeJobs(String filter, String jobType, int start, int len) |
| throws OozieClientException { |
| return bulkModifyJobs("resume", filter, jobType, start, len); |
| } |
| /** |
| * Change a coordinator job. |
| * |
| * @param jobId job Id. |
| * @param changeValue change value. |
| * @throws OozieClientException thrown if the job could not be changed. |
| */ |
| public void change(String jobId, String changeValue) throws OozieClientException { |
| new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call(); |
| } |
| |
| /** |
| * Ignore a coordinator job. |
| * |
| * @param jobId coord job Id. |
| * @param scope list of coord actions to be ignored |
| * @return the list ignored actions or null if there are none |
| * @throws OozieClientException thrown if the job could not be changed. |
| * @return list of ignored coordinator jobs |
| */ |
| public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException { |
| return new CoordIgnore(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, scope).call(); |
| } |
| |
| private class JobInfo extends ClientCallable<WorkflowJob> { |
| |
| JobInfo(String jobId, int start, int len) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start), |
| RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createWorkflowJob(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class JMSInfo extends ClientCallable<JMSConnectionInfo> { |
| |
| JMSInfo() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams()); |
| } |
| |
| protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createJMSConnectionInfo(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class WorkflowActionInfo extends ClientCallable<WorkflowAction> { |
| WorkflowActionInfo(String actionId) { |
| super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_INFO)); |
| } |
| |
| @Override |
| protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createWorkflowAction(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class WorkflowActionRetriesInfo extends ClientCallable<List<Map<String, String>>> { |
| WorkflowActionRetriesInfo(String actionId) { |
| super("GET", RestConstants.JOB, notEmpty(actionId, "id"), |
| prepareParams(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_ACTION_RETRIES_PARAM)); |
| } |
| |
| @Override |
| protected List<Map<String, String>> call(HttpURLConnection conn) |
| throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return new ObjectMapper().readValue(json.get(JsonTags.WORKFLOW_ACTION_RETRIES).toString(), |
| new TypeReference<List<Map<String, String>>>() { |
| }); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Get the info of a workflow job. |
| * |
| * @param jobId job Id. |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public WorkflowJob getJobInfo(String jobId) throws OozieClientException { |
| return getJobInfo(jobId, 0, 0); |
| } |
| |
| /** |
| * Get the JMS Connection info |
| * @return JMSConnectionInfo object |
| * @throws OozieClientException thrown if the JMS info could not be retrieved. |
| */ |
| public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException { |
| return new JMSInfo().call(); |
| } |
| |
| /** |
| * Get the info of a workflow job and subset actions. |
| * |
| * @param jobId job Id. |
| * @param start starting index in the list of actions belonging to the job |
| * @param len number of actions to be returned |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException { |
| return new JobInfo(jobId, start, len).call(); |
| } |
| |
| /** |
| * Get the info of a workflow action. |
| * |
| * @param actionId Id. |
| * @return the workflow action info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException { |
| return new WorkflowActionInfo(actionId).call(); |
| } |
| |
| |
| /** |
| * Get the info of a workflow action. |
| * |
| * @param actionId Id. |
| * @return the workflow action retries info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public List<Map<String, String>> getWorkflowActionRetriesInfo(String actionId) throws OozieClientException { |
| return new WorkflowActionRetriesInfo(actionId).call(); |
| } |
| |
| /** |
| * Get the log of a workflow job. |
| * |
| * @param jobId job Id. |
| * @return the job log. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public String getJobLog(String jobId) throws OozieClientException { |
| return new JobLog(jobId).call(); |
| } |
| |
| /** |
| * Get the audit log of a job. |
| * |
| * @param jobId the id of the job |
| * @param ps the stream to write the result into |
| * @throws OozieClientException thrown if the job audit log could not be retrieved. |
| */ |
| public void getJobAuditLog(String jobId, PrintStream ps) throws OozieClientException { |
| new JobAuditLog(jobId, ps).call(); |
| } |
| |
| /** |
| * Get the log of a job. |
| * |
| * @param jobId job Id. |
| * @param logRetrievalType Based on which filter criteria the log is retrieved |
| * @param logRetrievalScope Value for the retrieval type |
| * @param logFilter log filter |
| * @param ps Printstream of command line interface |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, |
| PrintStream ps) throws OozieClientException { |
| new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call(); |
| } |
| |
| /** |
| * Get the error log of a job. |
| * |
| * @param jobId the id of the job |
| * @param ps the stream to write the result into |
| * @throws OozieClientException thrown if the job log could not be retrieved. |
| */ |
| public void getJobErrorLog(String jobId, PrintStream ps) throws OozieClientException { |
| new JobErrorLog(jobId, ps).call(); |
| } |
| |
| /** |
| * Get the log of a job. |
| * |
| * @param jobId job Id. |
| * @param logRetrievalType Based on which filter criteria the log is retrieved |
| * @param logRetrievalScope Value for the retrieval type |
| * @param ps Printstream of command line interface |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) |
| throws OozieClientException { |
| getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps); |
| } |
| |
| private class JobLog extends JobMetadata { |
| JobLog(String jobId) { |
| super(jobId, RestConstants.JOB_SHOW_LOG); |
| } |
| JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) { |
| super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps); |
| } |
| } |
| |
| private class JobErrorLog extends JobMetadata { |
| JobErrorLog(String jobId, PrintStream ps) { |
| super(jobId, RestConstants.JOB_SHOW_ERROR_LOG, ps); |
| } |
| } |
| |
| private class JobAuditLog extends JobMetadata { |
| JobAuditLog(String jobId, PrintStream ps) { |
| super(jobId, RestConstants.JOB_SHOW_AUDIT_LOG, ps); |
| } |
| } |
| |
| |
| /** |
| * Gets the JMS topic name for a particular job |
| * @param jobId given jobId |
| * @return the JMS topic name |
| * @throws OozieClientException thrown if the JMS topic could not be retrieved. |
| */ |
| public String getJMSTopicName(String jobId) throws OozieClientException { |
| return new JMSTopic(jobId).call(); |
| } |
| |
| private class JMSTopic extends ClientCallable<String> { |
| |
| JMSTopic(String jobId) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_JMS_TOPIC)); |
| } |
| |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return (String) json.get(JsonTags.JMS_TOPIC_NAME); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Get the definition of a workflow job. |
| * |
| * @param jobId job Id. |
| * @return the job log. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public String getJobDefinition(String jobId) throws OozieClientException { |
| return new JobDefinition(jobId).call(); |
| } |
| |
| /** |
| * Get coord action missing dependencies |
| * @param jobId the id of the job |
| * @param actionList the list of coordinator actions |
| * @param dates a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::' |
| * @param ps the stream to write the result into |
| * @throws OozieClientException thrown if the info could not be retrieved. |
| */ |
| public void getCoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps) |
| throws OozieClientException { |
| new CoordActionMissingDependencies(jobId, actionList, dates, ps).call(); |
| } |
| |
| /** |
| * Get coord action missing dependencies |
| * @param jobId the id of the job |
| * @param actionList the list of coordinator actions |
| * @param dates a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::' |
| * @throws OozieClientException thrown if the info could not be retrieved. |
| */ |
| public void getCoordActionMissingDependencies(String jobId, String actionList, String dates) |
| throws OozieClientException { |
| new CoordActionMissingDependencies(jobId, actionList, dates).call(); |
| } |
| |
| private class CoordActionMissingDependencies extends ClientCallable<String> { |
| PrintStream printStream; |
| public CoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.COORD_ACTION_MISSING_DEPENDENCIES, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, actionList, |
| RestConstants.JOB_COORD_SCOPE_DATE, dates)); |
| this.printStream = ps; |
| } |
| |
| public CoordActionMissingDependencies(String jobId, String actionList, String dates) { |
| this(jobId, actionList, dates, System.out); |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| if (json != null) { |
| JSONArray inputDependencies = (JSONArray) json.get(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES); |
| for (Object dependencies : inputDependencies) { |
| printStream.println(); |
| printStream.println("CoordAction : " |
| + ((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_ID)); |
| if (((JSONObject) dependencies).get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES) != null) { |
| printStream.println("Blocked on : " |
| + ((JSONObject) dependencies) |
| .get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES)); |
| } |
| |
| if (((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_DATASETS) != null) { |
| |
| JSONArray missingDependencies = (JSONArray) ((JSONObject) dependencies) |
| .get(JsonTags.COORDINATOR_ACTION_DATASETS); |
| |
| for (Object missingDependenciesJson : missingDependencies) { |
| |
| printStream.println("Dataset : " |
| + ((JSONObject) missingDependenciesJson) |
| .get(JsonTags.COORDINATOR_ACTION_DATASET)); |
| |
| JSONArray inputDependenciesList = (JSONArray) ((JSONObject) missingDependenciesJson) |
| .get(JsonTags.COORDINATOR_ACTION_MISSING_DEPS); |
| printStream.println("Pending Dependencies : "); |
| |
| if (inputDependenciesList != null) { |
| for (String anInputDependenciesList : (Iterable<String>) inputDependenciesList) { |
| printStream.println("\t " + anInputDependenciesList); |
| } |
| } |
| printStream.println(); |
| } |
| } |
| } |
| } |
| else { |
| printStream.println(" No missing input dependencies found"); |
| } |
| |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class JobDefinition extends JobMetadata { |
| |
| JobDefinition(String jobId) { |
| super(jobId, RestConstants.JOB_SHOW_DEFINITION); |
| } |
| } |
| |
| private class JobMetadata extends ClientCallable<String> { |
| PrintStream printStream; |
| |
| JobMetadata(String jobId, String metaType) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| metaType)); |
| } |
| |
| JobMetadata(String jobId, String metaType, PrintStream ps) { |
| this(jobId, metaType); |
| printStream = ps; |
| |
| } |
| |
| JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter, |
| PrintStream ps) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM, |
| logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter)); |
| printStream = ps; |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| String returnVal = null; |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| InputStream is = conn.getInputStream(); |
| try (InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8)) { |
| if (printStream != null) { |
| sendToOutputStream(isr, -1, printStream); |
| } else { |
| returnVal = getReaderAsString(isr, -1); |
| } |
| } |
| } |
| else { |
| handleError(conn); |
| } |
| return returnVal; |
| } |
| |
| /** |
| * Output the log to command line interface |
| * |
| * @param reader reader to read into a string. |
| * @param maxLen max content length allowed, if -1 there is no limit. |
| * @throws IOException |
| */ |
| private void sendToOutputStream(Reader reader, int maxLen, PrintStream printStream) throws IOException { |
| notNull(reader, "reader"); |
| StringBuilder sb = new StringBuilder(); |
| char[] buffer = new char[2048]; |
| int read; |
| int count = 0; |
| int noOfCharstoFlush = 1024; |
| while ((read = reader.read(buffer)) > -1) { |
| count += read; |
| if ((maxLen > -1) && (count > maxLen)) { |
| break; |
| } |
| sb.append(buffer, 0, read); |
| if (sb.length() > noOfCharstoFlush) { |
| printStream.print(sb.toString()); |
| sb = new StringBuilder(""); |
| } |
| } |
| printStream.print(sb.toString()); |
| } |
| |
| /** |
| * Return a reader as string. |
| * <p> |
| * |
| * @param reader reader to read into a string. |
| * @param maxLen max content length allowed, if -1 there is no limit. |
| * @return the reader content. |
| * @throws IOException thrown if the resource could not be read. |
| */ |
| private String getReaderAsString(Reader reader, int maxLen) throws IOException { |
| notNull(reader, "reader"); |
| StringBuilder sb = new StringBuilder(); |
| char[] buffer = new char[2048]; |
| int read; |
| int count = 0; |
| while ((read = reader.read(buffer)) > -1) { |
| count += read; |
| |
| // read up to maxLen chars; |
| if ((maxLen > -1) && (count > maxLen)) { |
| break; |
| } |
| sb.append(buffer, 0, read); |
| } |
| reader.close(); |
| return sb.toString(); |
| } |
| } |
| |
| private class CoordJobInfo extends ClientCallable<CoordinatorJob> { |
| |
| CoordJobInfo(String jobId, String filter, int start, int len, String order) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, |
| Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len), RestConstants.ORDER_PARAM, |
| order)); |
| } |
| |
| @Override |
| protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createCoordinatorJob(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class WfsForCoordAction extends ClientCallable<List<WorkflowJob>> { |
| |
| WfsForCoordAction(String coordActionId) { |
| super("GET", RestConstants.JOB, notEmpty(coordActionId, "coordActionId"), prepareParams( |
| RestConstants.JOB_SHOW_PARAM, RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)); |
| } |
| |
| @Override |
| protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); |
| if (workflows == null) { |
| workflows = new JSONArray(); |
| } |
| return JsonToBean.createWorkflowJobList(workflows); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| |
| private class BundleJobInfo extends ClientCallable<BundleJob> { |
| |
| BundleJobInfo(String jobId) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_INFO)); |
| } |
| |
| @Override |
| protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createBundleJob(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class CoordActionInfo extends ClientCallable<CoordinatorAction> { |
| CoordActionInfo(String actionId) { |
| super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_INFO)); |
| } |
| |
| @Override |
| protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return JsonToBean.createCoordinatorAction(json); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Get the info of a bundle job. |
| * |
| * @param jobId job Id. |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { |
| return new BundleJobInfo(jobId).call(); |
| } |
| |
| /** |
| * Get the info of a coordinator job. |
| * |
| * @param jobId job Id. |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException { |
| return new CoordJobInfo(jobId, null, -1, -1, "asc").call(); |
| } |
| |
| /** |
| * Get the info of a coordinator job and subset actions. |
| * |
| * @param jobId job Id. |
| * @param filter filter the status filter |
| * @param start starting index in the list of actions belonging to the job |
| * @param len number of actions to be returned |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) |
| throws OozieClientException { |
| return new CoordJobInfo(jobId, filter, start, len, "asc").call(); |
| } |
| |
| /** |
| * Get the info of a coordinator job and subset actions. |
| * |
| * @param jobId job Id. |
| * @param filter filter the status filter |
| * @param start starting index in the list of actions belonging to the job |
| * @param len number of actions to be returned |
| * @param order order to list coord actions (e.g, desc) |
| * @return the job info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order) |
| throws OozieClientException { |
| return new CoordJobInfo(jobId, filter, start, len, order).call(); |
| } |
| |
| public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException { |
| return new WfsForCoordAction(coordActionId).call(); |
| } |
| |
| /** |
| * Get the info of a coordinator action. |
| * |
| * @param actionId Id. |
| * @return the coordinator action info. |
| * @throws OozieClientException thrown if the job info could not be retrieved. |
| */ |
| public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { |
| return new CoordActionInfo(actionId).call(); |
| } |
| |
| private class JobsStatus extends ClientCallable<List<WorkflowJob>> { |
| |
| JobsStatus(String filter, int start, int len) { |
| super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, |
| RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start), |
| RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); |
| if (workflows == null) { |
| workflows = new JSONArray(); |
| } |
| return JsonToBean.createWorkflowJobList(workflows); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> { |
| |
| CoordJobsStatus(String filter, int start, int len) { |
| super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, |
| RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start), |
| RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS); |
| if (jobs == null) { |
| jobs = new JSONArray(); |
| } |
| return JsonToBean.createCoordinatorJobList(jobs); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class BundleJobsStatus extends ClientCallable<List<BundleJob>> { |
| |
| BundleJobsStatus(String filter, int start, int len) { |
| super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, |
| RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start), |
| RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS); |
| if (jobs == null) { |
| jobs = new JSONArray(); |
| } |
| return JsonToBean.createBundleJobList(jobs); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> { |
| |
| BulkResponseStatus(String filter, int start, int len) { |
| super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter, |
| RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len))); |
| } |
| |
| @Override |
| protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES); |
| if (results == null) { |
| results = new JSONArray(); |
| } |
| return JsonToBean.createBulkResponseList(results); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class CoordActionsKill extends ClientCallable<List<CoordinatorAction>> { |
| |
| CoordActionsKill(String jobId, String rangeType, String scope) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_KILL, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rangeType, |
| RestConstants.JOB_COORD_SCOPE_PARAM, scope)); |
| } |
| |
| @Override |
| protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); |
| return JsonToBean.createCoordinatorActionList(coordActions); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class CoordIgnore extends ClientCallable<List<CoordinatorAction>> { |
| CoordIgnore(String jobId, String rerunType, String scope) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_IGNORE, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, |
| rerunType, RestConstants.JOB_COORD_SCOPE_PARAM, scope)); |
| } |
| |
| @Override |
| protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| if(json != null) { |
| JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); |
| return JsonToBean.createCoordinatorActionList(coordActions); |
| } |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| private class CoordRerun extends ClientCallable<List<CoordinatorAction>> { |
| private final Properties conf; |
| |
| CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, boolean failed, |
| Properties conf) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rerunType, |
| RestConstants.JOB_COORD_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, |
| Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean |
| .toString(noCleanup), RestConstants.JOB_COORD_RERUN_FAILED_PARAM, Boolean.toString(failed))); |
| this.conf = conf; |
| } |
| |
| @Override |
| protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); |
| return JsonToBean.createCoordinatorActionList(coordActions); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class BundleRerun extends ClientCallable<Void> { |
| |
| BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) { |
| super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, |
| RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM, |
| coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope, |
| RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh), |
| RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup))); |
| } |
| |
| @Override |
| protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| return null; |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Rerun coordinator actions. |
| * |
| * @param jobId coordinator jobId |
| * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used |
| * @param scope rerun scope for date or actionIds |
| * @param refresh true if -refresh is given in command option |
| * @param noCleanup true if -nocleanup is given in command option |
| * @return the list of rerun coordinator actions |
| * @throws OozieClientException thrown if the info could not be retrieved. |
| */ |
| public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, |
| boolean noCleanup) throws OozieClientException { |
| return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, false, null).call(); |
| } |
| |
| /** |
| * Rerun coordinator actions with failed option. |
| * |
| * @param jobId coordinator jobId |
| * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used |
| * @param scope rerun scope for date or actionIds |
| * @param refresh true if -refresh is given in command option |
| * @param noCleanup true if -nocleanup is given in command option |
| * @param failed true if -failed is given in command option |
| * @param props properties to use during rerun |
| * @return new coordinator job execution |
| * @throws OozieClientException thrown if the info could not be retrieved. |
| */ |
| public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, |
| boolean noCleanup, boolean failed, Properties props) throws OozieClientException { |
| return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, failed, props).call(); |
| } |
| |
| /** |
| * Rerun bundle coordinators. |
| * |
| * @param jobId bundle jobId |
| * @param coordScope rerun scope for coordinator jobs |
| * @param dateScope rerun scope for date |
| * @param refresh true if -refresh is given in command option |
| * @param noCleanup true if -nocleanup is given in command option |
| * @throws OozieClientException thrown if the info could not be retrieved. |
| */ |
| public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) |
| throws OozieClientException { |
| return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call(); |
| } |
| |
| /** |
| * Return the info of the workflow jobs that match the filter. |
| * |
| * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. |
| * @param start jobs offset, base 1. |
| * @param len number of jobs to return. |
| * @return a list with the workflow jobs info, without node details. |
| * @throws OozieClientException thrown if the jobs info could not be retrieved. |
| */ |
| public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException { |
| return new JobsStatus(filter, start, len).call(); |
| } |
| |
| /** |
| * Return the info of the workflow jobs that match the filter. |
| * <p> |
| * It returns the first 100 jobs that match the filter. |
| * |
| * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. |
| * @return a list with the workflow jobs info, without node details. |
| * @throws OozieClientException thrown if the jobs info could not be retrieved. |
| */ |
| public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException { |
| return getJobsInfo(filter, 1, 50); |
| } |
| |
| /** |
| * Sla enable alert. |
| * |
| * @param jobIds the job ids |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call(); |
| } |
| |
| /** |
| * Sla enable alert for bundle with coord name/id. |
| * |
| * @param bundleId the bundle id |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @param coords the coordinators |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaEnableAlert(String bundleId, String actions, String dates, String coords) |
| throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call(); |
| } |
| |
| /** |
| * Sla disable alert. |
| * |
| * @param jobIds the job ids |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call(); |
| } |
| |
| /** |
| * Sla disable alert for bundle with coord name/id. |
| * |
| * @param bundleId the bundle id |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @param coords the coordinators |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaDisableAlert(String bundleId, String actions, String dates, String coords) |
| throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call(); |
| } |
| |
| /** |
| * Sla change definations. |
| * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] |
| * Supported parameter key names are should-start, should-end and max-duration |
| * @param jobIds the job ids |
| * @param actions comma separated list of action ids or action id ranges. |
| * @param dates comma separated list of the nominal times |
| * @param newSlaParams the new sla params |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call(); |
| } |
| |
| /** |
| * Sla change defination for bundle with coord name/id. |
| * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] |
| * Supported parameter key names are should-start, should-end and max-duration |
| * @param bundleId the bundle id |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @param coords the coords |
| * @param newSlaParams the new sla params |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams) |
| throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call(); |
| } |
| |
| /** |
| * Sla change with new sla param as hasmap. |
| * Supported parameter key names are should-start, should-end and max-duration |
| * @param bundleId the bundle id |
| * @param actions comma separated list of action ids or action id ranges |
| * @param dates comma separated list of the nominal times |
| * @param coords the coords |
| * @param newSlaParams the new sla params |
| * @throws OozieClientException the oozie client exception |
| */ |
| public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams) |
| throws OozieClientException { |
| new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call(); |
| } |
| |
| /** |
| * Convert Map to string. |
| * |
| * @param map the map |
| * @return the string |
| */ |
| protected String mapToString(Map<String, String> map) { |
| StringBuilder sb = new StringBuilder(); |
| for (Entry<String, String> e : map.entrySet()) { |
| sb.append(e.getKey()).append("=").append(e.getValue()).append(";"); |
| } |
| return sb.toString(); |
| } |
| |
| private class UpdateSLA extends ClientCallable<Void> { |
| |
| UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) { |
| super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, |
| action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, |
| dates, RestConstants.COORDINATORS_PARAM, coords)); |
| } |
| |
| UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) { |
| super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, |
| action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, |
| dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams)); |
| } |
| |
| @Override |
| protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| System.out.println("Done"); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Print sla info about coordinator and workflow jobs and actions. |
| * |
| * @param start starting offset |
| * @param len number of results |
| * @param filter filters to use. Elements must be semicolon-separated name=value pairs |
| * @throws OozieClientException thrown if the sla info could not be retrieved |
| */ |
| public void getSlaInfo(int start, int len, String filter) throws OozieClientException { |
| new SlaInfo(start, len, filter).call(); |
| } |
| |
| private class SlaInfo extends ClientCallable<Void> { |
| |
| SlaInfo(int start, int len, String filter) { |
| super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID, |
| Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len), |
| RestConstants.JOBS_FILTER_PARAM, filter)); |
| } |
| |
| @Override |
| protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| BufferedReader br = new BufferedReader( |
| new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); |
| String line; |
| while ((line = br.readLine()) != null) { |
| System.out.println(line); |
| } |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class JobIdAction extends ClientCallable<String> { |
| |
| JobIdAction(String externalId) { |
| super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf", |
| RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId)); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return (String) json.get(JsonTags.JOB_ID); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Return the workflow job Id for an external Id. |
| * <p> |
| * The external Id must have provided at job creation time. |
| * |
| * @param externalId external Id given at job creation time. |
| * @return the workflow job Id for an external Id, <code>null</code> if none. |
| * @throws OozieClientException thrown if the operation could not be done. |
| */ |
| public String getJobId(String externalId) throws OozieClientException { |
| return new JobIdAction(externalId).call(); |
| } |
| |
| private class SetSystemMode extends ClientCallable<Void> { |
| |
| public SetSystemMode(SYSTEM_MODE status) { |
| super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams( |
| RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + "")); |
| } |
| |
| @Override |
| public Void call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status |
| * command to change and view the safe mode status. |
| * |
| * @param status true to enable safe mode, false to disable safe mode. |
| * @throws OozieClientException if it fails to set the safe mode status. |
| */ |
| public void setSystemMode(SYSTEM_MODE status) throws OozieClientException { |
| new SetSystemMode(status).call(); |
| } |
| |
| private class GetSystemMode extends ClientCallable<SYSTEM_MODE> { |
| |
| GetSystemMode() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams()); |
| } |
| |
| @Override |
| protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE)); |
| } |
| else { |
| handleError(conn); |
| } |
| return SYSTEM_MODE.NORMAL; |
| } |
| } |
| |
| /** |
| * Returns if Oozie is in safe mode or not. |
| * |
| * @return true if safe mode is ON<br> |
| * false if safe mode is OFF |
| * @throws OozieClientException throw if it could not obtain the safe mode status. |
| */ |
| /* |
| * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); } |
| */ |
| public SYSTEM_MODE getSystemMode() throws OozieClientException { |
| return new GetSystemMode().call(); |
| } |
| |
| public String updateShareLib() throws OozieClientException { |
| return new UpdateSharelib().call(); |
| } |
| |
| public String listShareLib(String sharelibKey) throws OozieClientException { |
| return new ListShareLib(sharelibKey).call(); |
| } |
| |
| private class GetBuildVersion extends ClientCallable<String> { |
| |
| GetBuildVersion() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams()); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return json.get(JsonTags.BUILD_INFO).toString(); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class ValidateXML extends ClientCallable<String> { |
| |
| String file = null; |
| |
| ValidateXML(String file, String user) { |
| super("POST", WS_PROTOCOL_VERSION, RestConstants.VALIDATE, "", |
| prepareParams(RestConstants.FILE_PARAM, file, RestConstants.USER_PARAM, user)); |
| this.file = file; |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| if (file.startsWith("/")) { |
| FileInputStream fi = new FileInputStream(new File(file)); |
| byte[] buffer = new byte[1024]; |
| int n; |
| while (-1 != (n = fi.read(buffer))) { |
| conn.getOutputStream().write(buffer, 0, n); |
| } |
| } |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return (String) json.get(JsonTags.VALIDATE); |
| } |
| else if ((conn.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND)) { |
| return null; |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class SubmitXML extends ClientCallable<String> { |
| SubmitXML(final String xml, final String user) { |
| super("POST", |
| WS_PROTOCOL_VERSION, |
| RestConstants.ACTION_PARAM, |
| RestConstants.JOB_ACTION_SUBMIT, |
| prepareParams()); |
| } |
| |
| @Override |
| protected String call(final HttpURLConnection conn) throws IOException, OozieClientException { |
| conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); |
| return null; |
| } |
| } |
| |
| private class UpdateSharelib extends ClientCallable<String> { |
| |
| UpdateSharelib() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams( |
| RestConstants.ALL_SERVER_REQUEST, "true")); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| StringBuilder bf = new StringBuilder(); |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| Object sharelib = JSONValue.parse(reader); |
| bf.append("[ShareLib update status]").append(System.getProperty("line.separator")); |
| if (sharelib instanceof JSONArray) { |
| for (Object o : ((JSONArray) sharelib)) { |
| JSONObject obj = (JSONObject) ((JSONObject) o).get(JsonTags.SHARELIB_LIB_UPDATE); |
| for (Object key : obj.keySet()) { |
| bf.append("\t").append(key).append(" = ").append(obj.get(key)) |
| .append(System.getProperty("line.separator")); |
| } |
| bf.append(System.getProperty("line.separator")); |
| } |
| } |
| else{ |
| JSONObject obj = (JSONObject) ((JSONObject) sharelib).get(JsonTags.SHARELIB_LIB_UPDATE); |
| for (Object key : obj.keySet()) { |
| bf.append("\t").append(key).append(" = ").append(obj.get(key)) |
| .append(System.getProperty("line.separator")); |
| } |
| bf.append(System.getProperty("line.separator")); |
| } |
| return bf.toString(); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class ListShareLib extends ClientCallable<String> { |
| |
| ListShareLib(String sharelibKey) { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_LIST_SHARELIB, prepareParams( |
| RestConstants.SHARE_LIB_REQUEST_KEY, sharelibKey)); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| StringBuilder bf = new StringBuilder(); |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| Object sharelib = json.get(JsonTags.SHARELIB_LIB); |
| bf.append("[Available ShareLib]").append(System.getProperty("line.separator")); |
| if (sharelib instanceof JSONArray) { |
| for (Object o : ((JSONArray) sharelib)) { |
| JSONObject obj = (JSONObject) o; |
| bf.append(obj.get(JsonTags.SHARELIB_LIB_NAME)) |
| .append(System.getProperty("line.separator")); |
| if (obj.get(JsonTags.SHARELIB_LIB_FILES) != null) { |
| for (Object file : ((JSONArray) obj.get(JsonTags.SHARELIB_LIB_FILES))) { |
| bf.append("\t").append(file).append(System.getProperty("line.separator")); |
| } |
| } |
| } |
| return bf.toString(); |
| } |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| |
| } |
| |
| public String purgeCommand(String purgeOptions) throws OozieClientException { |
| String workflowAge = ""; |
| String coordAge = ""; |
| String bundleAge = ""; |
| String purgeLimit = ""; |
| String oldCoordAction = ""; |
| if (purgeOptions != null) { |
| String options[] = purgeOptions.split(";"); |
| for (String option : options) { |
| String pair[] = option.split("="); |
| if (pair.length < 2) { |
| throw new OozieClientException(OozieClientException.INVALID_INPUT, |
| "Invalid purge option pair [" + option + "] specified."); |
| } |
| String key = pair[0].toLowerCase(); |
| String value = pair[1]; |
| switch (key) { |
| case "wf": |
| workflowAge = String.valueOf(ValidationUtil.parsePositiveInteger(value)); |
| break; |
| case "coord": |
| coordAge = String.valueOf(ValidationUtil.parsePositiveInteger(value)); |
| break; |
| case "bundle": |
| bundleAge = String.valueOf(ValidationUtil.parsePositiveInteger(value)); |
| break; |
| case "limit": |
| purgeLimit = String.valueOf(ValidationUtil.parsePositiveInteger(value)); |
| break; |
| case "oldcoordaction": |
| oldCoordAction = value; |
| break; |
| default: |
| throw new OozieClientException(OozieClientException.INVALID_INPUT, |
| "Invalid purge option [" + key + "] specified."); |
| } |
| } |
| } |
| PurgeCommand purgeCommand = new PurgeCommand(workflowAge, coordAge, bundleAge, purgeLimit, oldCoordAction); |
| return purgeCommand.call(); |
| } |
| |
| private class PurgeCommand extends ClientCallable<String> { |
| |
| PurgeCommand(String workflowAge, String coordAge, String bundleAge, String purgeLimit, String oldCoordAction) { |
| super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_PURGE, prepareParams( |
| RestConstants.PURGE_WF_AGE, workflowAge, |
| RestConstants.PURGE_COORD_AGE, coordAge, |
| RestConstants.PURGE_BUNDLE_AGE, bundleAge, |
| RestConstants.PURGE_LIMIT, purgeLimit, |
| RestConstants.PURGE_OLD_COORD_ACTION, oldCoordAction |
| )); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject jsonObject = (JSONObject) JSONValue.parse(reader); |
| Object msg = jsonObject.get(JsonTags.PURGE); |
| return msg.toString(); |
| } else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Return the Oozie server build version. |
| * |
| * @return the Oozie server build version. |
| * @throws OozieClientException throw if the server build version could not be retrieved. |
| */ |
| public String getServerBuildVersion() throws OozieClientException { |
| return new GetBuildVersion().call(); |
| } |
| |
| /** |
| * Return the Oozie client build version. |
| * |
| * @return the Oozie client build version. |
| */ |
| public String getClientBuildVersion() { |
| return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION); |
| } |
| |
| /** |
| * Return the workflow application is valid. |
| * |
| * @param file local file or hdfs file. |
| * @return the workflow application is valid. |
| * @throws OozieClientException throw if it the workflow application's validation could not be retrieved. |
| */ |
| public String validateXML(String file) throws OozieClientException { |
| String fileName = file; |
| if (file.startsWith("file://")) { |
| fileName = file.substring(7, file.length()); |
| } |
| if (!fileName.contains("://")) { |
| File f = new File(fileName); |
| if (!f.isFile()) { |
| throw new OozieClientException("File error", "File does not exist : " + f.getAbsolutePath()); |
| } |
| fileName = f.getAbsolutePath(); |
| } |
| return new ValidateXML(fileName, getUserName()).call(); |
| } |
| |
| private String getUserName() { |
| String userName = USER_NAME_TL.get(); |
| if (userName == null) { |
| userName = System.getProperty("user.name"); |
| } |
| return userName; |
| } |
| |
| /** |
| * Return the info of the coordinator jobs that match the filter. |
| * |
| * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. |
| * @param start jobs offset, base 1. |
| * @param len number of jobs to return. |
| * @return a list with the coordinator jobs info |
| * @throws OozieClientException thrown if the jobs info could not be retrieved. |
| */ |
| public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException { |
| return new CoordJobsStatus(filter, start, len).call(); |
| } |
| |
| /** |
| * Return the info of the bundle jobs that match the filter. |
| * |
| * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. |
| * @param start jobs offset, base 1. |
| * @param len number of jobs to return. |
| * @return a list with the bundle jobs info |
| * @throws OozieClientException thrown if the jobs info could not be retrieved. |
| */ |
| public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { |
| return new BundleJobsStatus(filter, start, len).call(); |
| } |
| |
| public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException { |
| return new BulkResponseStatus(filter, start, len).call(); |
| } |
| |
| /** |
| * Poll a job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID) and return when it has reached a |
| * terminal state. |
| * (i.e. FAILED, KILLED, SUCCEEDED) |
| * |
| * @param id The Job ID |
| * @param timeout timeout in minutes (negative values indicate no timeout) |
| * @param interval polling interval in minutes (must be positive) |
| * @param verbose if true, the current status will be printed out at each poll; if false, no output |
| * @throws OozieClientException thrown if the job's status could not be retrieved |
| */ |
| public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException { |
| notEmpty("id", id); |
| if (interval < 1) { |
| throw new IllegalArgumentException("interval must be a positive integer"); |
| } |
| boolean noTimeout = (timeout < 1); |
| long endTime = System.currentTimeMillis() + timeout * 60 * 1000; |
| interval *= 60 * 1000; |
| |
| final Set<String> completedStatuses; |
| if (id.endsWith("-W")) { |
| completedStatuses = COMPLETED_WF_STATUSES; |
| } else if (id.endsWith("-C")) { |
| completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; |
| } else if (id.endsWith("-B")) { |
| completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; |
| } else if (id.contains("-C@")) { |
| completedStatuses = COMPLETED_COORD_ACTION_STATUSES; |
| } else { |
| throw new IllegalArgumentException("invalid job type"); |
| } |
| |
| String status = getStatus(id); |
| if (verbose) { |
| System.out.println(status); |
| } |
| while(!completedStatuses.contains(status) && (noTimeout || System.currentTimeMillis() <= endTime)) { |
| try { |
| Thread.sleep(interval); |
| } catch (InterruptedException ie) { |
| // ignore |
| } |
| status = getStatus(id); |
| if (verbose) { |
| System.out.println(status); |
| } |
| } |
| } |
| |
| /** |
| * Gets the status for a particular job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID). |
| * |
| * @param jobId given jobId |
| * @return the status |
| * @throws OozieClientException thrown if the status could not be retrieved |
| */ |
| public String getStatus(String jobId) throws OozieClientException { |
| return new Status(jobId).call(); |
| } |
| |
| private class Status extends ClientCallable<String> { |
| |
| Status(String jobId) { |
| super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, |
| RestConstants.JOB_SHOW_STATUS)); |
| } |
| |
| @Override |
| protected String call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return (String) json.get(JsonTags.STATUS); |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| private class GetQueueDump extends ClientCallable<List<String>> { |
| GetQueueDump() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams()); |
| } |
| |
| @Override |
| protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() != HttpURLConnection.HTTP_OK)) { |
| handleError(conn); |
| return null; |
| } |
| |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| List<String> queueDumpMessages = new ArrayList<>(); |
| |
| addSeparator(queueDumpMessages); |
| |
| addQueueMessages(json, |
| queueDumpMessages, |
| JsonTags.QUEUE_DUMP, |
| JsonTags.CALLABLE_DUMP, |
| "[Server Queue Dump]:", |
| "The queue dump is empty, nothing to display."); |
| |
| addSeparator(queueDumpMessages); |
| |
| addQueueMessages(json, |
| queueDumpMessages, |
| JsonTags.UNIQUE_MAP_DUMP, |
| JsonTags.UNIQUE_ENTRY_DUMP, |
| "[Server Uniqueness Map Dump]:", |
| "The uniqueness map dump is empty, nothing to display."); |
| |
| addSeparator(queueDumpMessages); |
| |
| return queueDumpMessages; |
| } |
| |
| private void addQueueMessages(JSONObject json, |
| List<String> queueMessages, |
| String queueName, |
| String queueValue, |
| String headerMessage, |
| String emptyMessage) { |
| |
| JSONArray queueDumpArray = (JSONArray) json.get(queueName); |
| queueMessages.add(headerMessage); |
| |
| for (Object o : queueDumpArray) { |
| JSONObject entry = (JSONObject) o; |
| if (entry.get(queueValue) != null) { |
| String value = (String) entry.get(queueValue); |
| queueMessages.add(value); |
| } |
| } |
| |
| if (queueDumpArray.isEmpty()) { |
| queueMessages.add(emptyMessage); |
| } |
| } |
| |
| private void addSeparator(List<String> list) { |
| list.add("******************************************"); |
| } |
| } |
| |
| /** |
| * Return the Oozie queue's commands' dump |
| * |
| * @return the list of strings of callable identification in queue |
| * @throws OozieClientException throw if it the queue dump could not be retrieved. |
| */ |
| public List<String> getQueueDump() throws OozieClientException { |
| return new GetQueueDump().call(); |
| } |
| |
| private class GetAvailableOozieServers extends MapClientCallable { |
| |
| GetAvailableOozieServers() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_AVAILABLE_OOZIE_SERVERS_RESOURCE, prepareParams()); |
| } |
| } |
| |
| /** |
| * Return the list of available Oozie servers. |
| * |
| * @return the list of available Oozie servers. |
| * @throws OozieClientException throw if it the list of available Oozie servers could not be retrieved. |
| */ |
| public Map<String, String> getAvailableOozieServers() throws OozieClientException { |
| return new GetAvailableOozieServers().call(); |
| } |
| |
| private class GetServerConfiguration extends MapClientCallable { |
| |
| GetServerConfiguration() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_CONFIG_RESOURCE, prepareParams()); |
| } |
| } |
| |
| /** |
| * Return the Oozie system configuration. |
| * |
| * @return the Oozie system configuration. |
| * @throws OozieClientException throw if the system configuration could not be retrieved. |
| */ |
| public Map<String, String> getServerConfiguration() throws OozieClientException { |
| return new GetServerConfiguration().call(); |
| } |
| |
| private class GetJavaSystemProperties extends MapClientCallable { |
| |
| GetJavaSystemProperties() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE, prepareParams()); |
| } |
| } |
| |
| /** |
| * Return the Oozie Java system properties. |
| * |
| * @return the Oozie Java system properties. |
| * @throws OozieClientException throw if the system properties could not be retrieved. |
| */ |
| public Map<String, String> getJavaSystemProperties() throws OozieClientException { |
| return new GetJavaSystemProperties().call(); |
| } |
| |
| private class GetOSEnv extends MapClientCallable { |
| |
| GetOSEnv() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_OS_ENV_RESOURCE, prepareParams()); |
| } |
| } |
| |
| /** |
| * Return the Oozie system OS environment. |
| * |
| * @return the Oozie system OS environment. |
| * @throws OozieClientException throw if the system OS environment could not be retrieved. |
| */ |
| public Map<String, String> getOSEnv() throws OozieClientException { |
| return new GetOSEnv().call(); |
| } |
| |
| private class GetMetrics extends ClientCallable<Metrics> { |
| |
| GetMetrics() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_METRICS_RESOURCE, prepareParams()); |
| } |
| |
| @Override |
| protected Metrics call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return new Metrics(json); |
| } |
| else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) { |
| // Use Instrumentation endpoint |
| return null; |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| public class Metrics { |
| private Map<String, Long> counters; |
| private Map<String, Object> gauges; |
| private Map<String, Timer> timers; |
| private Map<String, Histogram> histograms; |
| |
| @SuppressWarnings("unchecked") |
| public Metrics(JSONObject json) { |
| JSONObject jCounters = (JSONObject) json.get("counters"); |
| counters = new HashMap<>(jCounters.size()); |
| for (Object entO : jCounters.entrySet()) { |
| Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; |
| counters.put(ent.getKey(), (Long)ent.getValue().get("count")); |
| } |
| |
| JSONObject jGuages = (JSONObject) json.get("gauges"); |
| gauges = new HashMap<>(jGuages.size()); |
| for (Object entO : jGuages.entrySet()) { |
| Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; |
| gauges.put(ent.getKey(), ent.getValue().get("value")); |
| } |
| |
| JSONObject jTimers = (JSONObject) json.get("timers"); |
| timers = new HashMap<>(jTimers.size()); |
| for (Object entO : jTimers.entrySet()) { |
| Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; |
| timers.put(ent.getKey(), new Timer(ent.getValue())); |
| } |
| |
| JSONObject jHistograms = (JSONObject) json.get("histograms"); |
| histograms = new HashMap<>(jHistograms.size()); |
| for (Object entO : jHistograms.entrySet()) { |
| Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; |
| histograms.put(ent.getKey(), new Histogram(ent.getValue())); |
| } |
| } |
| |
| public Map<String, Long> getCounters() { |
| return counters; |
| } |
| |
| public Map<String, Object> getGauges() { |
| return gauges; |
| } |
| |
| public Map<String, Timer> getTimers() { |
| return timers; |
| } |
| |
| public Map<String, Histogram> getHistograms() { |
| return histograms; |
| } |
| |
| public class Timer extends Histogram { |
| private double m15Rate; |
| private double m5Rate; |
| private double m1Rate; |
| private double meanRate; |
| private String durationUnits; |
| private String rateUnits; |
| |
| public Timer(JSONObject json) { |
| super(json); |
| m15Rate = Double.valueOf(json.get("m15_rate").toString()); |
| m5Rate = Double.valueOf(json.get("m5_rate").toString()); |
| m1Rate = Double.valueOf(json.get("m1_rate").toString()); |
| meanRate = Double.valueOf(json.get("mean_rate").toString()); |
| durationUnits = json.get("duration_units").toString(); |
| rateUnits = json.get("rate_units").toString(); |
| } |
| |
| public double get15MinuteRate() { |
| return m15Rate; |
| } |
| |
| public double get5MinuteRate() { |
| return m5Rate; |
| } |
| |
| public double get1MinuteRate() { |
| return m1Rate; |
| } |
| |
| public double getMeanRate() { |
| return meanRate; |
| } |
| |
| public String getDurationUnits() { |
| return durationUnits; |
| } |
| |
| public String getRateUnits() { |
| return rateUnits; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(super.toString()); |
| sb.append("\n\t15 minute rate : ").append(m15Rate); |
| sb.append("\n\t5 minute rate : ").append(m5Rate); |
| sb.append("\n\t1 minute rate : ").append(m15Rate); |
| sb.append("\n\tmean rate : ").append(meanRate); |
| sb.append("\n\tduration units : ").append(durationUnits); |
| sb.append("\n\trate units : ").append(rateUnits); |
| return sb.toString(); |
| } |
| } |
| |
| public class Histogram { |
| private double p999; |
| private double p99; |
| private double p98; |
| private double p95; |
| private double p75; |
| private double p50; |
| private double mean; |
| private double min; |
| private double max; |
| private double stdDev; |
| private long count; |
| |
| public Histogram(JSONObject json) { |
| p999 = Double.valueOf(json.get("p999").toString()); |
| p99 = Double.valueOf(json.get("p99").toString()); |
| p98 = Double.valueOf(json.get("p98").toString()); |
| p95 = Double.valueOf(json.get("p95").toString()); |
| p75 = Double.valueOf(json.get("p75").toString()); |
| p50 = Double.valueOf(json.get("p50").toString()); |
| mean = Double.valueOf(json.get("mean").toString()); |
| min = Double.valueOf(json.get("min").toString()); |
| max = Double.valueOf(json.get("max").toString()); |
| stdDev = Double.valueOf(json.get("stddev").toString()); |
| count = Long.valueOf(json.get("count").toString()); |
| } |
| |
| public double get999thPercentile() { |
| return p999; |
| } |
| |
| public double get99thPercentile() { |
| return p99; |
| } |
| |
| public double get98thPercentile() { |
| return p98; |
| } |
| |
| public double get95thPercentile() { |
| return p95; |
| } |
| |
| public double get75thPercentile() { |
| return p75; |
| } |
| |
| public double get50thPercentile() { |
| return p50; |
| } |
| |
| public double getMean() { |
| return mean; |
| } |
| |
| public double getMin() { |
| return min; |
| } |
| |
| public double getMax() { |
| return max; |
| } |
| |
| public double getStandardDeviation() { |
| return stdDev; |
| } |
| |
| public long getCount() { |
| return count; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("\t999th percentile : ").append(p999); |
| sb.append("\n\t99th percentile : ").append(p99); |
| sb.append("\n\t98th percentile : ").append(p98); |
| sb.append("\n\t95th percentile : ").append(p95); |
| sb.append("\n\t75th percentile : ").append(p75); |
| sb.append("\n\t50th percentile : ").append(p50); |
| sb.append("\n\tmean : ").append(mean); |
| sb.append("\n\tmax : ").append(max); |
| sb.append("\n\tmin : ").append(min); |
| sb.append("\n\tcount : ").append(count); |
| sb.append("\n\tstandard deviation : ").append(stdDev); |
| return sb.toString(); |
| } |
| } |
| } |
| |
| /** |
| * Return the Oozie metrics. If null is returned, then try {@link #getInstrumentation()}. |
| * |
| * @return the Oozie metrics or null. |
| * @throws OozieClientException throw if the metrics could not be retrieved. |
| */ |
| public Metrics getMetrics() throws OozieClientException { |
| return new GetMetrics().call(); |
| } |
| |
| private class GetInstrumentation extends ClientCallable<Instrumentation> { |
| |
| GetInstrumentation() { |
| super("GET", RestConstants.ADMIN, RestConstants.ADMIN_INSTRUMENTATION_RESOURCE, prepareParams()); |
| } |
| |
| @Override |
| protected Instrumentation call(HttpURLConnection conn) throws IOException, OozieClientException { |
| if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { |
| Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); |
| JSONObject json = (JSONObject) JSONValue.parse(reader); |
| return new Instrumentation(json); |
| } |
| else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) { |
| // Use Metrics endpoint |
| return null; |
| } |
| else { |
| handleError(conn); |
| } |
| return null; |
| } |
| } |
| |
| public class Instrumentation { |
| private Map<String, Long> counters; |
| private Map<String, Object> variables; |
| private Map<String, Double> samplers; |
| private Map<String, Timer> timers; |
| |
| public Instrumentation(JSONObject json) { |
| JSONArray jCounters = (JSONArray) json.get("counters"); |
| counters = new HashMap<>(jCounters.size()); |
| for (Object groupO : jCounters) { |
| JSONObject group = (JSONObject) groupO; |
| String groupName = group.get("group").toString() + "."; |
| JSONArray data = (JSONArray) group.get("data"); |
| for (Object datO : data) { |
| JSONObject dat = (JSONObject) datO; |
| counters.put(groupName + dat.get("name").toString(), Long.valueOf(dat.get("value").toString())); |
| } |
| } |
| |
| JSONArray jVariables = (JSONArray) json.get("variables"); |
| variables = new HashMap<>(jVariables.size()); |
| for (Object groupO : jVariables) { |
| JSONObject group = (JSONObject) groupO; |
| String groupName = group.get("group").toString() + "."; |
| JSONArray data = (JSONArray) group.get("data"); |
| for (Object datO : data) { |
| JSONObject dat = (JSONObject) datO; |
| variables.put(groupName + dat.get("name").toString(), dat.get("value")); |
| } |
| } |
| |
| JSONArray jSamplers = (JSONArray) json.get("samplers"); |
| samplers = new HashMap<>(jSamplers.size()); |
| for (Object groupO : jSamplers) { |
| JSONObject group = (JSONObject) groupO; |
| String groupName = group.get("group").toString() + "."; |
| JSONArray data = (JSONArray) group.get("data"); |
| for (Object datO : data) { |
| JSONObject dat = (JSONObject) datO; |
| samplers.put(groupName + dat.get("name").toString(), Double.valueOf(dat.get("value").toString())); |
| } |
| } |
| |
| JSONArray jTimers = (JSONArray) json.get("timers"); |
| timers = new HashMap<>(jTimers.size()); |
| for (Object groupO : jTimers) { |
| JSONObject group = (JSONObject) groupO; |
| String groupName = group.get("group").toString() + "."; |
| JSONArray data = (JSONArray) group.get("data"); |
| for (Object datO : data) { |
| JSONObject dat = (JSONObject) datO; |
| timers.put(groupName + dat.get("name").toString(), new Timer(dat)); |
| } |
| } |
| } |
| |
| public class Timer { |
| private double ownTimeStdDev; |
| private long ownTimeAvg; |
| private long ownMaxTime; |
| private long ownMinTime; |
| private double totalTimeStdDev; |
| private long totalTimeAvg; |
| private long totalMaxTime; |
| private long totalMinTime; |
| private long ticks; |
| |
| public Timer(JSONObject json) { |
| ownTimeStdDev = Double.valueOf(json.get("ownTimeStdDev").toString()); |
| ownTimeAvg = Long.valueOf(json.get("ownTimeAvg").toString()); |
| ownMaxTime = Long.valueOf(json.get("ownMaxTime").toString()); |
| ownMinTime = Long.valueOf(json.get("ownMinTime").toString()); |
| totalTimeStdDev = Double.valueOf(json.get("totalTimeStdDev").toString()); |
| totalTimeAvg = Long.valueOf(json.get("totalTimeAvg").toString()); |
| totalMaxTime = Long.valueOf(json.get("totalMaxTime").toString()); |
| totalMinTime = Long.valueOf(json.get("totalMinTime").toString()); |
| ticks = Long.valueOf(json.get("ticks").toString()); |
| } |
| |
| public double getOwnTimeStandardDeviation() { |
| return ownTimeStdDev; |
| } |
| |
| public long getOwnTimeAverage() { |
| return ownTimeAvg; |
| } |
| |
| public long getOwnMaxTime() { |
| return ownMaxTime; |
| } |
| |
| public long getOwnMinTime() { |
| return ownMinTime; |
| } |
| |
| public double getTotalTimeStandardDeviation() { |
| return totalTimeStdDev; |
| } |
| |
| public long getTotalTimeAverage() { |
| return totalTimeAvg; |
| } |
| |
| public long getTotalMaxTime() { |
| return totalMaxTime; |
| } |
| |
| public long getTotalMinTime() { |
| return totalMinTime; |
| } |
| |
| public long getTicks() { |
| return ticks; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("\town time standard deviation : ").append(ownTimeStdDev); |
| sb.append("\n\town average time : ").append(ownTimeAvg); |
| sb.append("\n\town max time : ").append(ownMaxTime); |
| sb.append("\n\town min time : ").append(ownMinTime); |
| sb.append("\n\ttotal time standard deviation : ").append(totalTimeStdDev); |
| sb.append("\n\ttotal average time : ").append(totalTimeAvg); |
| sb.append("\n\ttotal max time : ").append(totalMaxTime); |
| sb.append("\n\ttotal min time : ").append(totalMinTime); |
| sb.append("\n\tticks : ").append(ticks); |
| return sb.toString(); |
| } |
| } |
| |
| public Map<String, Long> getCounters() { |
| return counters; |
| } |
| |
| public Map<String, Object> getVariables() { |
| return variables; |
| } |
| |
| public Map<String, Double> getSamplers() { |
| return samplers; |
| } |
| |
| public Map<String, Timer> getTimers() { |
| return timers; |
| } |
| } |
| |
| /** |
| * Return the Oozie instrumentation. If null is returned, then try {@link #getMetrics()}. |
| * |
| * @return the Oozie intstrumentation or null. |
| * @throws OozieClientException throw if the intstrumentation could not be retrieved. |
| */ |
| public Instrumentation getInstrumentation() throws OozieClientException { |
| return new GetInstrumentation().call(); |
| } |
| |
| /** |
| * Check if the string is not null or not empty. |
| * |
| * @param str the string to check |
| * @param name the name to present in the error message |
| * @return string |
| */ |
| public static String notEmpty(String str, String name) { |
| if (str == null) { |
| throw new IllegalArgumentException(name + " cannot be null"); |
| } |
| if (str.length() == 0) { |
| throw new IllegalArgumentException(name + " cannot be empty"); |
| } |
| return str; |
| } |
| |
| /** |
| * Check if the object is not null. |
| * |
| * @param <T> the type of the object |
| * @param obj the object to check |
| * @param name the name to present in the error message |
| * @return string |
| */ |
| public static <T> T notNull(T obj, String name) { |
| if (obj == null) { |
| throw new IllegalArgumentException(name + " cannot be null"); |
| } |
| return obj; |
| } |
| } |