blob: 1c6966fd1bc452ed98b5f5bf58a684a466cd0889 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import org.apache.oozie.BuildInfo;
import org.apache.oozie.cli.ValidationUtil;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonToBean;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.client.retry.ConnectionRetriableClient;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
/**
* Client API to submit and manage Oozie workflow jobs against an Oozie instance.
* <p>
* This class is thread safe.
* <p>
* Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
* <code>[NAME=VALUE][;NAME=VALUE]*</code>.
* <p>
* Valid filter names are:
* <ul>
* <li>name: the workflow application name from the workflow definition.</li>
* <li>user: the user that submitted the job.</li>
* <li>group: the group for the job.</li>
* <li>status: the status of the job.</li>
* </ul>
* <p>
* The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
* name. Multiple values must be specified as different name value pairs.
*/
public class OozieClient {
public static final long WS_PROTOCOL_VERSION_0 = 0;
public static final long WS_PROTOCOL_VERSION_1 = 1;
public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version
public static final String USER_NAME = "user.name";
@Deprecated
public static final String GROUP_NAME = "group.name";
public static final String JOB_ACL = "oozie.job.acl";
public static final String APP_PATH = "oozie.wf.application.path";
public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
public static final String BUNDLE_ID = "oozie.bundle.id";
public static final String EXTERNAL_ID = "oozie.wf.external.id";
public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy";
public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
public static final String COORD_ACTION_NOTIFICATION_PROXY = "oozie.coord.action.notification.proxy";
public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
public static final String LOG_TOKEN = "oozie.wf.log.token";
public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
public static final String FILTER_USER = "user";
public static final String FILTER_TEXT = "text";
public static final String FILTER_GROUP = "group";
public static final String FILTER_NAME = "name";
public static final String FILTER_STATUS = "status";
public static final String FILTER_NOMINAL_TIME = "nominaltime";
public static final String FILTER_FREQUENCY = "frequency";
public static final String FILTER_ID = "id";
public static final String FILTER_UNIT = "unit";
public static final String FILTER_JOBID = "jobid";
public static final String FILTER_APPNAME = "appname";
public static final String FILTER_CREATED_TIME_START = "startcreatedtime";
public static final String FILTER_CREATED_TIME_END = "endcreatedtime";
public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts";
public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts";
public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than";
public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord";
public static final String CHANGE_VALUE_ENDTIME = "endtime";
public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
public static final String CHANGE_VALUE_STATUS = "status";
public static final String LIBPATH = "oozie.libpath";
public static final String CONFIG_PATH = "oozie.default.configuration.path";
public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
public static final String FILTER_SORT_BY = "sortby";
public static final String CONFIG_KEY_GENERATED_XML = "oozie.jobs.api.generated.xml";
public enum SORT_BY {
createdTime("createdTimestamp"), lastModifiedTime("lastModifiedTimestamp");
private final String fullname;
SORT_BY(String fullname) {
this.fullname = fullname;
}
public String getFullname() {
return fullname;
}
}
public enum SYSTEM_MODE {
NORMAL, NOWEBSERVICE, SAFEMODE
}
private static final Set<String> COMPLETED_WF_STATUSES = new HashSet<>();
private static final Set<String> COMPLETED_COORD_AND_BUNDLE_STATUSES = new HashSet<>();
private static final Set<String> COMPLETED_COORD_ACTION_STATUSES = new HashSet<>();
static {
COMPLETED_WF_STATUSES.add(WorkflowJob.Status.FAILED.toString());
COMPLETED_WF_STATUSES.add(WorkflowJob.Status.KILLED.toString());
COMPLETED_WF_STATUSES.add(WorkflowJob.Status.SUCCEEDED.toString());
COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.FAILED.toString());
COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.KILLED.toString());
COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.SUCCEEDED.toString());
COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.DONEWITHERROR.toString());
COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.IGNORED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.FAILED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.IGNORED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.KILLED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SKIPPED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SUCCEEDED.toString());
COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.TIMEDOUT.toString());
}
/**
* debugMode =0 means no debugging. 1 means debugging on.
*/
public int debugMode = 0;
private int retryCount = 4;
private String baseUrl;
private String protocolUrl;
private boolean validatedVersion = false;
private JSONArray supportedVersions;
private final Map<String, String> headers = new HashMap<>();
private static final ThreadLocal<String> USER_NAME_TL = new ThreadLocal<>();
/**
* Allows to impersonate other users in the Oozie server. The current user
* must be configured as a proxyuser in Oozie.
* <p>
* IMPORTANT: impersonation happens only with Oozie client requests done within
* doAs() calls.
*
* @param <T> the type of the callable
* @param userName user to impersonate.
* @param callable callable with {@link OozieClient} calls impersonating the specified user.
* @return any response returned by the {@link Callable#call()} method.
* @throws Exception thrown by the {@link Callable#call()} method.
*/
public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
notEmpty(userName, "userName");
notNull(callable, "callable");
try {
USER_NAME_TL.set(userName);
return callable.call();
}
finally {
USER_NAME_TL.remove();
}
}
protected OozieClient() {
}
/**
* Create a Workflow client instance.
*
* @param oozieUrl URL of the Oozie instance it will interact with.
*/
public OozieClient(String oozieUrl) {
this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
if (!this.baseUrl.endsWith("/")) {
this.baseUrl += "/";
}
}
/**
* Return the Oozie URL of the workflow client instance.
* <p>
* This URL is the base URL fo the Oozie system, with not protocol versioning.
*
* @return the Oozie URL of the workflow client instance.
*/
public String getOozieUrl() {
return baseUrl;
}
/**
* Return the Oozie URL used by the client and server for WS communications.
* <p>
* This URL is the original URL plus the versioning element path.
*
* @return the Oozie URL used by the client and server for communication.
* @throws OozieClientException thrown in the client and the server are not protocol compatible.
*/
public String getProtocolUrl() throws OozieClientException {
validateWSVersion();
return protocolUrl;
}
/**
* @return current debug Mode
*/
public int getDebugMode() {
return debugMode;
}
/**
* Set debug mode.
*
* @param debugMode : 0 means no debugging. 1 means debugging
*/
public void setDebugMode(int debugMode) {
this.debugMode = debugMode;
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
private String getBaseURLForVersion(long protocolVersion) throws OozieClientException {
try {
if (supportedVersions == null) {
supportedVersions = getSupportedProtocolVersions();
}
if (supportedVersions == null) {
throw new OozieClientException("HTTP error", "no response message");
}
if (supportedVersions.contains(protocolVersion)) {
return baseUrl + "v" + protocolVersion + "/";
}
else {
throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version "
+ protocolVersion + " is not supported");
}
}
catch (IOException e) {
throw new OozieClientException(OozieClientException.IO_ERROR, e);
}
}
/**
* Validate that the Oozie client and server instances are protocol compatible.
*
* @throws OozieClientException thrown in the client and the server are not protocol compatible.
*/
public synchronized void validateWSVersion() throws OozieClientException {
if (!validatedVersion) {
try {
supportedVersions = getSupportedProtocolVersions();
if (supportedVersions == null) {
throw new OozieClientException("HTTP error", "no response message");
}
if (!supportedVersions.contains(WS_PROTOCOL_VERSION)
&& !supportedVersions.contains(WS_PROTOCOL_VERSION_1)
&& !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
StringBuilder msg = new StringBuilder();
msg.append("Supported version [").append(WS_PROTOCOL_VERSION)
.append("] or less, Unsupported versions[");
String separator = "";
for (Object version : supportedVersions) {
msg.append(separator).append(version);
}
msg.append("]");
throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
}
if (supportedVersions.contains(WS_PROTOCOL_VERSION)) {
protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
}
else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) {
protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/";
}
else {
if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
}
}
}
catch (IOException ex) {
throw new OozieClientException(OozieClientException.IO_ERROR, ex);
}
validatedVersion = true;
}
}
private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException {
JSONArray versions = null;
final URL url = new URL(baseUrl + RestConstants.VERSIONS);
HttpURLConnection conn = createRetryableConnection(url, "GET");
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
}
else {
handleError(conn);
}
return versions;
}
/**
* Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
*
* @return an empty configuration.
*/
public Properties createConfiguration() {
Properties conf = new Properties();
conf.setProperty(USER_NAME, getUserName());
return conf;
}
/**
* Set a HTTP header to be used in the WS requests by the workflow instance.
*
* @param name header name.
* @param value header value.
*/
public void setHeader(String name, String value) {
headers.put(notEmpty(name, "name"), notNull(value, "value"));
}
/**
* Get the value of a set HTTP header from the workflow instance.
*
* @param name header name.
* @return header value, <code>null</code> if not set.
*/
public String getHeader(String name) {
return headers.get(notEmpty(name, "name"));
}
/**
* Get the set HTTP header
*
* @return map of header key and value
*/
public Map<String, String> getHeaders() {
return headers;
}
/**
* Remove a HTTP header from the workflow client instance.
*
* @param name header name.
*/
public void removeHeader(String name) {
headers.remove(notEmpty(name, "name"));
}
/**
* Return an iterator with all the header names set in the workflow instance.
*
* @return header names.
*/
public Iterator<String> getHeaderNames() {
return Collections.unmodifiableMap(headers).keySet().iterator();
}
private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters)
throws IOException, OozieClientException {
validateWSVersion();
StringBuilder sb = new StringBuilder();
if (protocolVersion == null) {
sb.append(protocolUrl);
}
else {
sb.append(getBaseURLForVersion(protocolVersion));
}
sb.append(collection);
if (resource != null && resource.length() > 0) {
sb.append("/").append(resource);
}
if (parameters.size() > 0) {
String separator = "?";
for (Map.Entry<String, String> param : parameters.entrySet()) {
if (param.getValue() != null) {
sb.append(separator).append(URLEncoder.encode(param.getKey(), StandardCharsets.UTF_8.name())).append("=")
.append(URLEncoder.encode(param.getValue(), StandardCharsets.UTF_8.name()));
separator = "&";
}
}
}
return new URL(sb.toString());
}
private boolean validateCommand(String url) {
{
if (protocolUrl.contains(baseUrl + "v0")) {
if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
return false;
}
}
}
return true;
}
/**
* Create retryable http connection to oozie server.
*
* @param url URL to create connection to
* @param method method to use - GET, POST, PUT
* @return connection
* @throws IOException in case of an exception during connection setup
*/
protected HttpURLConnection createRetryableConnection(final URL url, final String method) throws IOException {
return (HttpURLConnection) new ConnectionRetriableClient(getRetryCount()) {
@Override
public Object doExecute(URL url, String method) throws IOException, OozieClientException {
return createConnection(url, method);
}
}.execute(url, method);
}
/**
* Create http connection to oozie server.
*
* @param url URL to create connection to
* @param method method to use - GET, POST, PUT
* @return connection
* @throws IOException in case of an exception during connection setup
* @throws OozieClientException if the connection can't be set up
*/
protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(method);
if (method.equals("POST") || method.equals("PUT")) {
conn.setDoOutput(true);
}
for (Map.Entry<String, String> header : headers.entrySet()) {
conn.setRequestProperty(header.getKey(), header.getValue());
}
return conn;
}
protected abstract class ClientCallable<T> implements Callable<T> {
private final String method;
private final String collection;
private final String resource;
private final Map<String, String> params;
private final Long protocolVersion;
public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
this(method, null, collection, resource, params);
}
public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) {
this.method = method;
this.protocolVersion = protocolVersion;
this.collection = collection;
this.resource = resource;
this.params = params;
}
public T call() throws OozieClientException {
try {
URL url = createURL(protocolVersion, collection, resource, params);
if (validateCommand(url.toString())) {
if (getDebugMode() > 0) {
System.out.println(method + " " + url);
}
return call(createRetryableConnection(url, method));
}
else {
System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
+ " Use 'oozie help' for details");
throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
}
}
catch (IOException ex) {
throw new OozieClientException(OozieClientException.IO_ERROR, ex);
}
}
protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
}
protected abstract class MapClientCallable extends ClientCallable<Map<String, String>> {
MapClientCallable(String method, String collection, String resource, Map<String, String> params) {
super(method, collection, resource, params);
}
@Override
protected Map<String, String> call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
Map<String, String> map = new HashMap<>();
for (Object key : json.keySet()) {
map.put((String)key, (String)json.get(key));
}
return map;
}
else {
handleError(conn);
}
return null;
}
}
static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
int status = conn.getResponseCode();
String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
if (error == null) {
error = "HTTP error code: " + status;
}
if (message == null) {
message = conn.getResponseMessage();
}
throw new OozieClientException(error, message);
}
static Map<String, String> prepareParams(String... params) {
Map<String, String> map = new LinkedHashMap<>();
for (int i = 0; i < params.length; i = i + 2) {
map.put(params[i], params[i + 1]);
}
String doAsUserName = USER_NAME_TL.get();
if (doAsUserName != null) {
map.put(RestConstants.DO_AS_PARAM, doAsUserName);
}
return map;
}
public void writeToXml(Properties props, OutputStream out) throws IOException {
try {
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
Element conf = doc.createElement("configuration");
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
String value = props.getProperty(name);
Element propNode = doc.createElement("property");
conf.appendChild(propNode);
Element nameNode = doc.createElement("name");
nameNode.appendChild(doc.createTextNode(name.trim()));
propNode.appendChild(nameNode);
Element valueNode = doc.createElement("value");
valueNode.appendChild(doc.createTextNode(value.trim()));
propNode.appendChild(valueNode);
conf.appendChild(doc.createTextNode("\n"));
}
DOMSource source = new DOMSource(doc);
StreamResult result = new StreamResult(out);
TransformerFactory transFactory = TransformerFactory.newInstance();
transFactory.setFeature("http://javax.xml.XMLConstants/feature/secure-processing", true);
Transformer transformer = transFactory.newTransformer();
transformer.transform(source, result);
if (getDebugMode() > 0) {
result = new StreamResult(System.out);
transformer.transform(source, result);
System.out.println();
}
}
catch (Exception e) {
throw new IOException(e);
}
}
private class JobSubmit extends ClientCallable<String> {
private final Properties conf;
private final String generatedXml;
JobSubmit(final Properties conf, final boolean start, final String generatedXml) {
super("POST", RestConstants.JOBS, "",
(start)
? prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_START,
RestConstants.USER_PARAM,
getUserName())
: prepareParams(RestConstants.USER_PARAM,
getUserName()));
this.conf = notNull(conf, "conf");
this.generatedXml = generatedXml;
}
JobSubmit(String jobId, Properties conf) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_RERUN, RestConstants.USER_PARAM, getUserName()));
this.conf = notNull(conf, "conf");
this.generatedXml = null;
}
public JobSubmit(Properties conf, String jobActionDryrun) {
super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_DRYRUN, RestConstants.USER_PARAM, getUserName()));
this.conf = notNull(conf, "conf");
this.generatedXml = null;
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if (!Strings.isNullOrEmpty(generatedXml)) {
conf.setProperty(CONFIG_KEY_GENERATED_XML, generatedXml);
}
writeToXml(conf, conn.getOutputStream());
if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
JSONObject json = (JSONObject) JSONValue.parse(
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
return (String) json.get(JsonTags.JOB_ID);
}
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
handleError(conn);
}
return null;
}
}
/**
* Submit a workflow job.
*
* @param conf job configuration.
* @return the job Id.
* @throws OozieClientException thrown if the job could not be submitted.
*/
public String submit(Properties conf) throws OozieClientException {
return (new JobSubmit(conf, false, null)).call();
}
public String submit(final Properties conf, final String generatedXml) throws OozieClientException {
return (new JobSubmit(conf, false, generatedXml)).call();
}
private class JobAction extends ClientCallable<Void> {
JobAction(String jobId, String action) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
}
JobAction(String jobId, String action, String params) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
RestConstants.JOB_CHANGE_VALUE, params));
}
@Override
protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
handleError(conn);
}
return null;
}
}
private class JobsAction extends ClientCallable<JSONObject> {
JobsAction(String action, String filter, String jobType, int start, int len) {
super("PUT", RestConstants.JOBS, "",
prepareParams(RestConstants.ACTION_PARAM, action,
RestConstants.JOB_FILTER_PARAM, filter, RestConstants.JOBTYPE_PARAM, jobType,
RestConstants.OFFSET_PARAM, Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected JSONObject call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
return (JSONObject) JSONValue.parse(reader);
}
else {
handleError(conn);
}
return null;
}
}
/**
* Update coord definition.
*
* @param jobId the job id
* @param conf the conf
* @param dryrun the dryrun
* @param showDiff the show diff
* @return the string
* @throws OozieClientException the oozie client exception
*/
public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff)
throws OozieClientException {
return (new UpdateCoord(jobId, conf, dryrun, showDiff)).call();
}
/**
* Update coord definition without properties.
*
* @param jobId the job id
* @param dryrun the dryrun
* @param showDiff the show diff
* @return the string
* @throws OozieClientException the oozie client exception
*/
public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException {
return (new UpdateCoord(jobId, dryrun, showDiff)).call();
}
/**
* The Class UpdateCoord.
*/
private class UpdateCoord extends ClientCallable<String> {
private final Properties conf;
public UpdateCoord(String jobId, Properties conf, String jobActionDryrun, String showDiff) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_COORD_UPDATE, RestConstants.JOB_ACTION_DRYRUN, jobActionDryrun,
RestConstants.JOB_ACTION_SHOWDIFF, showDiff));
this.conf = conf;
}
public UpdateCoord(String jobId, String jobActionDryrun, String showDiff) {
this(jobId, new Properties(), jobActionDryrun, showDiff);
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
writeToXml(conf, conn.getOutputStream());
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
JSONObject json = (JSONObject) JSONValue.parse(
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
JSONObject update = (JSONObject) json.get(JsonTags.COORD_UPDATE);
if (update != null) {
return (String) update.get(JsonTags.COORD_UPDATE_DIFF);
}
else {
return "";
}
}
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
handleError(conn);
}
return null;
}
}
/**
* dryrun for a given job
*
* @param conf Job configuration.
* @return new job.
* @throws org.apache.oozie.client.OozieClientException thrown if the job could not be submitted.
*/
public String dryrun(Properties conf) throws OozieClientException {
return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
}
/**
* Start a workflow job.
*
* @param jobId job Id.
* @throws OozieClientException thrown if the job could not be started.
*/
public void start(String jobId) throws OozieClientException {
new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
}
/**
* Submit and start a workflow job.
*
* @param conf job configuration.
* @return the job Id.
* @throws OozieClientException thrown if the job could not be submitted.
*/
public String run(Properties conf) throws OozieClientException {
return (new JobSubmit(conf, true, null)).call();
}
public String run(final Properties conf, final String generatedXml) throws OozieClientException {
return (new JobSubmit(conf, true, generatedXml)).call();
}
/**
* Rerun a workflow job.
*
* @param jobId job Id to rerun.
* @param conf configuration information for the rerun.
* @throws OozieClientException thrown if the job could not be started.
*/
public void reRun(String jobId, Properties conf) throws OozieClientException {
new JobSubmit(jobId, conf).call();
}
/**
* Suspend a workflow job.
*
* @param jobId job Id.
* @throws OozieClientException thrown if the job could not be suspended.
*/
public void suspend(String jobId) throws OozieClientException {
new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
}
/**
* Resume a workflow job.
*
* @param jobId job Id.
* @throws OozieClientException thrown if the job could not be resume.
*/
public void resume(String jobId) throws OozieClientException {
new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
}
/**
* Kill a workflow/coord/bundle job.
*
* @param jobId job Id.
* @throws OozieClientException thrown if the job could not be killed.
*/
public void kill(String jobId) throws OozieClientException {
new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
}
/**
* Kill coordinator actions
* @param jobId coordinator Job Id
* @param rangeType type 'date' if -date is used, 'action-num' if -action is used
* @param scope kill scope for date or action nums
* @return list of coordinator actions that underwent kill
* @throws OozieClientException thrown if some actions could not be killed.
*/
public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
return new CoordActionsKill(jobId, rangeType, scope).call();
}
public JSONObject bulkModifyJobs(String actionType, String filter, String jobType, int start, int len)
throws OozieClientException {
return new JobsAction(actionType, filter, jobType, start, len).call();
}
public JSONObject killJobs(String filter, String jobType, int start, int len)
throws OozieClientException {
return bulkModifyJobs("kill", filter, jobType, start, len);
}
public JSONObject suspendJobs(String filter, String jobType, int start, int len)
throws OozieClientException {
return bulkModifyJobs("suspend", filter, jobType, start, len);
}
public JSONObject resumeJobs(String filter, String jobType, int start, int len)
throws OozieClientException {
return bulkModifyJobs("resume", filter, jobType, start, len);
}
/**
* Change a coordinator job.
*
* @param jobId job Id.
* @param changeValue change value.
* @throws OozieClientException thrown if the job could not be changed.
*/
public void change(String jobId, String changeValue) throws OozieClientException {
new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
}
/**
* Ignore a coordinator job.
*
* @param jobId coord job Id.
* @param scope list of coord actions to be ignored
* @return the list ignored actions or null if there are none
* @throws OozieClientException thrown if the job could not be changed.
* @return list of ignored coordinator jobs
*/
public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException {
return new CoordIgnore(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, scope).call();
}
private class JobInfo extends ClientCallable<WorkflowJob> {
JobInfo(String jobId, int start, int len) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createWorkflowJob(json);
}
else {
handleError(conn);
}
return null;
}
}
private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
JMSInfo() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
}
protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createJMSConnectionInfo(json);
}
else {
handleError(conn);
}
return null;
}
}
private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
WorkflowActionInfo(String actionId) {
super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_INFO));
}
@Override
protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createWorkflowAction(json);
}
else {
handleError(conn);
}
return null;
}
}
private class WorkflowActionRetriesInfo extends ClientCallable<List<Map<String, String>>> {
WorkflowActionRetriesInfo(String actionId) {
super("GET", RestConstants.JOB, notEmpty(actionId, "id"),
prepareParams(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_ACTION_RETRIES_PARAM));
}
@Override
protected List<Map<String, String>> call(HttpURLConnection conn)
throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return new ObjectMapper().readValue(json.get(JsonTags.WORKFLOW_ACTION_RETRIES).toString(),
new TypeReference<List<Map<String, String>>>() {
});
}
else {
handleError(conn);
}
return null;
}
}
/**
* Get the info of a workflow job.
*
* @param jobId job Id.
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
return getJobInfo(jobId, 0, 0);
}
/**
* Get the JMS Connection info
* @return JMSConnectionInfo object
* @throws OozieClientException thrown if the JMS info could not be retrieved.
*/
public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
return new JMSInfo().call();
}
/**
* Get the info of a workflow job and subset actions.
*
* @param jobId job Id.
* @param start starting index in the list of actions belonging to the job
* @param len number of actions to be returned
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
return new JobInfo(jobId, start, len).call();
}
/**
* Get the info of a workflow action.
*
* @param actionId Id.
* @return the workflow action info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
return new WorkflowActionInfo(actionId).call();
}
/**
* Get the info of a workflow action.
*
* @param actionId Id.
* @return the workflow action retries info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public List<Map<String, String>> getWorkflowActionRetriesInfo(String actionId) throws OozieClientException {
return new WorkflowActionRetriesInfo(actionId).call();
}
/**
* Get the log of a workflow job.
*
* @param jobId job Id.
* @return the job log.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public String getJobLog(String jobId) throws OozieClientException {
return new JobLog(jobId).call();
}
/**
* Get the audit log of a job.
*
* @param jobId the id of the job
* @param ps the stream to write the result into
* @throws OozieClientException thrown if the job audit log could not be retrieved.
*/
public void getJobAuditLog(String jobId, PrintStream ps) throws OozieClientException {
new JobAuditLog(jobId, ps).call();
}
/**
* Get the log of a job.
*
* @param jobId job Id.
* @param logRetrievalType Based on which filter criteria the log is retrieved
* @param logRetrievalScope Value for the retrieval type
* @param logFilter log filter
* @param ps Printstream of command line interface
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter,
PrintStream ps) throws OozieClientException {
new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call();
}
/**
* Get the error log of a job.
*
* @param jobId the id of the job
* @param ps the stream to write the result into
* @throws OozieClientException thrown if the job log could not be retrieved.
*/
public void getJobErrorLog(String jobId, PrintStream ps) throws OozieClientException {
new JobErrorLog(jobId, ps).call();
}
/**
* Get the log of a job.
*
* @param jobId job Id.
* @param logRetrievalType Based on which filter criteria the log is retrieved
* @param logRetrievalScope Value for the retrieval type
* @param ps Printstream of command line interface
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
throws OozieClientException {
getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps);
}
private class JobLog extends JobMetadata {
JobLog(String jobId) {
super(jobId, RestConstants.JOB_SHOW_LOG);
}
JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) {
super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps);
}
}
private class JobErrorLog extends JobMetadata {
JobErrorLog(String jobId, PrintStream ps) {
super(jobId, RestConstants.JOB_SHOW_ERROR_LOG, ps);
}
}
private class JobAuditLog extends JobMetadata {
JobAuditLog(String jobId, PrintStream ps) {
super(jobId, RestConstants.JOB_SHOW_AUDIT_LOG, ps);
}
}
/**
* Gets the JMS topic name for a particular job
* @param jobId given jobId
* @return the JMS topic name
* @throws OozieClientException thrown if the JMS topic could not be retrieved.
*/
public String getJMSTopicName(String jobId) throws OozieClientException {
return new JMSTopic(jobId).call();
}
private class JMSTopic extends ClientCallable<String> {
JMSTopic(String jobId) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_JMS_TOPIC));
}
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return (String) json.get(JsonTags.JMS_TOPIC_NAME);
}
else {
handleError(conn);
}
return null;
}
}
/**
* Get the definition of a workflow job.
*
* @param jobId job Id.
* @return the job log.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public String getJobDefinition(String jobId) throws OozieClientException {
return new JobDefinition(jobId).call();
}
/**
* Get coord action missing dependencies
* @param jobId the id of the job
* @param actionList the list of coordinator actions
* @param dates a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::'
* @param ps the stream to write the result into
* @throws OozieClientException thrown if the info could not be retrieved.
*/
public void getCoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps)
throws OozieClientException {
new CoordActionMissingDependencies(jobId, actionList, dates, ps).call();
}
/**
* Get coord action missing dependencies
* @param jobId the id of the job
* @param actionList the list of coordinator actions
* @param dates a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::'
* @throws OozieClientException thrown if the info could not be retrieved.
*/
public void getCoordActionMissingDependencies(String jobId, String actionList, String dates)
throws OozieClientException {
new CoordActionMissingDependencies(jobId, actionList, dates).call();
}
private class CoordActionMissingDependencies extends ClientCallable<String> {
PrintStream printStream;
public CoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.COORD_ACTION_MISSING_DEPENDENCIES, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, actionList,
RestConstants.JOB_COORD_SCOPE_DATE, dates));
this.printStream = ps;
}
public CoordActionMissingDependencies(String jobId, String actionList, String dates) {
this(jobId, actionList, dates, System.out);
}
@SuppressWarnings("unchecked")
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
if (json != null) {
JSONArray inputDependencies = (JSONArray) json.get(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES);
for (Object dependencies : inputDependencies) {
printStream.println();
printStream.println("CoordAction : "
+ ((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_ID));
if (((JSONObject) dependencies).get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES) != null) {
printStream.println("Blocked on : "
+ ((JSONObject) dependencies)
.get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES));
}
if (((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_DATASETS) != null) {
JSONArray missingDependencies = (JSONArray) ((JSONObject) dependencies)
.get(JsonTags.COORDINATOR_ACTION_DATASETS);
for (Object missingDependenciesJson : missingDependencies) {
printStream.println("Dataset : "
+ ((JSONObject) missingDependenciesJson)
.get(JsonTags.COORDINATOR_ACTION_DATASET));
JSONArray inputDependenciesList = (JSONArray) ((JSONObject) missingDependenciesJson)
.get(JsonTags.COORDINATOR_ACTION_MISSING_DEPS);
printStream.println("Pending Dependencies : ");
if (inputDependenciesList != null) {
for (String anInputDependenciesList : (Iterable<String>) inputDependenciesList) {
printStream.println("\t " + anInputDependenciesList);
}
}
printStream.println();
}
}
}
}
else {
printStream.println(" No missing input dependencies found");
}
}
else {
handleError(conn);
}
return null;
}
}
private class JobDefinition extends JobMetadata {
JobDefinition(String jobId) {
super(jobId, RestConstants.JOB_SHOW_DEFINITION);
}
}
private class JobMetadata extends ClientCallable<String> {
PrintStream printStream;
JobMetadata(String jobId, String metaType) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
metaType));
}
JobMetadata(String jobId, String metaType, PrintStream ps) {
this(jobId, metaType);
printStream = ps;
}
JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter,
PrintStream ps) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter));
printStream = ps;
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
String returnVal = null;
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
InputStream is = conn.getInputStream();
try (InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8)) {
if (printStream != null) {
sendToOutputStream(isr, -1, printStream);
} else {
returnVal = getReaderAsString(isr, -1);
}
}
}
else {
handleError(conn);
}
return returnVal;
}
/**
* Output the log to command line interface
*
* @param reader reader to read into a string.
* @param maxLen max content length allowed, if -1 there is no limit.
* @throws IOException
*/
private void sendToOutputStream(Reader reader, int maxLen, PrintStream printStream) throws IOException {
notNull(reader, "reader");
StringBuilder sb = new StringBuilder();
char[] buffer = new char[2048];
int read;
int count = 0;
int noOfCharstoFlush = 1024;
while ((read = reader.read(buffer)) > -1) {
count += read;
if ((maxLen > -1) && (count > maxLen)) {
break;
}
sb.append(buffer, 0, read);
if (sb.length() > noOfCharstoFlush) {
printStream.print(sb.toString());
sb = new StringBuilder("");
}
}
printStream.print(sb.toString());
}
/**
* Return a reader as string.
* <p>
*
* @param reader reader to read into a string.
* @param maxLen max content length allowed, if -1 there is no limit.
* @return the reader content.
* @throws IOException thrown if the resource could not be read.
*/
private String getReaderAsString(Reader reader, int maxLen) throws IOException {
notNull(reader, "reader");
StringBuilder sb = new StringBuilder();
char[] buffer = new char[2048];
int read;
int count = 0;
while ((read = reader.read(buffer)) > -1) {
count += read;
// read up to maxLen chars;
if ((maxLen > -1) && (count > maxLen)) {
break;
}
sb.append(buffer, 0, read);
}
reader.close();
return sb.toString();
}
}
private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
CoordJobInfo(String jobId, String filter, int start, int len, String order) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM,
Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len), RestConstants.ORDER_PARAM,
order));
}
@Override
protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createCoordinatorJob(json);
}
else {
handleError(conn);
}
return null;
}
}
private class WfsForCoordAction extends ClientCallable<List<WorkflowJob>> {
WfsForCoordAction(String coordActionId) {
super("GET", RestConstants.JOB, notEmpty(coordActionId, "coordActionId"), prepareParams(
RestConstants.JOB_SHOW_PARAM, RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION));
}
@Override
protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
if (workflows == null) {
workflows = new JSONArray();
}
return JsonToBean.createWorkflowJobList(workflows);
}
else {
handleError(conn);
}
return null;
}
}
private class BundleJobInfo extends ClientCallable<BundleJob> {
BundleJobInfo(String jobId) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_INFO));
}
@Override
protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createBundleJob(json);
}
else {
handleError(conn);
}
return null;
}
}
private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
CoordActionInfo(String actionId) {
super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_INFO));
}
@Override
protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return JsonToBean.createCoordinatorAction(json);
}
else {
handleError(conn);
}
return null;
}
}
/**
* Get the info of a bundle job.
*
* @param jobId job Id.
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
return new BundleJobInfo(jobId).call();
}
/**
* Get the info of a coordinator job.
*
* @param jobId job Id.
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
return new CoordJobInfo(jobId, null, -1, -1, "asc").call();
}
/**
* Get the info of a coordinator job and subset actions.
*
* @param jobId job Id.
* @param filter filter the status filter
* @param start starting index in the list of actions belonging to the job
* @param len number of actions to be returned
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len)
throws OozieClientException {
return new CoordJobInfo(jobId, filter, start, len, "asc").call();
}
/**
* Get the info of a coordinator job and subset actions.
*
* @param jobId job Id.
* @param filter filter the status filter
* @param start starting index in the list of actions belonging to the job
* @param len number of actions to be returned
* @param order order to list coord actions (e.g, desc)
* @return the job info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order)
throws OozieClientException {
return new CoordJobInfo(jobId, filter, start, len, order).call();
}
public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException {
return new WfsForCoordAction(coordActionId).call();
}
/**
* Get the info of a coordinator action.
*
* @param actionId Id.
* @return the coordinator action info.
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
return new CoordActionInfo(actionId).call();
}
private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
JobsStatus(String filter, int start, int len) {
super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
if (workflows == null) {
workflows = new JSONArray();
}
return JsonToBean.createWorkflowJobList(workflows);
}
else {
handleError(conn);
}
return null;
}
}
private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
CoordJobsStatus(String filter, int start, int len) {
super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
if (jobs == null) {
jobs = new JSONArray();
}
return JsonToBean.createCoordinatorJobList(jobs);
}
else {
handleError(conn);
}
return null;
}
}
private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
BundleJobsStatus(String filter, int start, int len) {
super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
if (jobs == null) {
jobs = new JSONArray();
}
return JsonToBean.createBundleJobList(jobs);
}
else {
handleError(conn);
}
return null;
}
}
private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
BulkResponseStatus(String filter, int start, int len) {
super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
}
@Override
protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
if (results == null) {
results = new JSONArray();
}
return JsonToBean.createBulkResponseList(results);
}
else {
handleError(conn);
}
return null;
}
}
private class CoordActionsKill extends ClientCallable<List<CoordinatorAction>> {
CoordActionsKill(String jobId, String rangeType, String scope) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_KILL, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rangeType,
RestConstants.JOB_COORD_SCOPE_PARAM, scope));
}
@Override
protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
return JsonToBean.createCoordinatorActionList(coordActions);
}
else {
handleError(conn);
}
return null;
}
}
private class CoordIgnore extends ClientCallable<List<CoordinatorAction>> {
CoordIgnore(String jobId, String rerunType, String scope) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_IGNORE, RestConstants.JOB_COORD_RANGE_TYPE_PARAM,
rerunType, RestConstants.JOB_COORD_SCOPE_PARAM, scope));
}
@Override
protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
if(json != null) {
JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
return JsonToBean.createCoordinatorActionList(coordActions);
}
}
else {
handleError(conn);
}
return null;
}
}
private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
private final Properties conf;
CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, boolean failed,
Properties conf) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rerunType,
RestConstants.JOB_COORD_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
.toString(noCleanup), RestConstants.JOB_COORD_RERUN_FAILED_PARAM, Boolean.toString(failed)));
this.conf = conf;
}
@Override
protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
return JsonToBean.createCoordinatorActionList(coordActions);
}
else {
handleError(conn);
}
return null;
}
}
private class BundleRerun extends ClientCallable<Void> {
BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
}
@Override
protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
return null;
}
else {
handleError(conn);
}
return null;
}
}
/**
* Rerun coordinator actions.
*
* @param jobId coordinator jobId
* @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
* @param scope rerun scope for date or actionIds
* @param refresh true if -refresh is given in command option
* @param noCleanup true if -nocleanup is given in command option
* @return the list of rerun coordinator actions
* @throws OozieClientException thrown if the info could not be retrieved.
*/
public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
boolean noCleanup) throws OozieClientException {
return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, false, null).call();
}
/**
* Rerun coordinator actions with failed option.
*
* @param jobId coordinator jobId
* @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
* @param scope rerun scope for date or actionIds
* @param refresh true if -refresh is given in command option
* @param noCleanup true if -nocleanup is given in command option
* @param failed true if -failed is given in command option
* @param props properties to use during rerun
* @return new coordinator job execution
* @throws OozieClientException thrown if the info could not be retrieved.
*/
public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
boolean noCleanup, boolean failed, Properties props) throws OozieClientException {
return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, failed, props).call();
}
/**
* Rerun bundle coordinators.
*
* @param jobId bundle jobId
* @param coordScope rerun scope for coordinator jobs
* @param dateScope rerun scope for date
* @param refresh true if -refresh is given in command option
* @param noCleanup true if -nocleanup is given in command option
* @throws OozieClientException thrown if the info could not be retrieved.
*/
public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
throws OozieClientException {
return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
}
/**
* Return the info of the workflow jobs that match the filter.
*
* @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
* @param start jobs offset, base 1.
* @param len number of jobs to return.
* @return a list with the workflow jobs info, without node details.
* @throws OozieClientException thrown if the jobs info could not be retrieved.
*/
public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
return new JobsStatus(filter, start, len).call();
}
/**
* Return the info of the workflow jobs that match the filter.
* <p>
* It returns the first 100 jobs that match the filter.
*
* @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
* @return a list with the workflow jobs info, without node details.
* @throws OozieClientException thrown if the jobs info could not be retrieved.
*/
public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
return getJobsInfo(filter, 1, 50);
}
/**
* Sla enable alert.
*
* @param jobIds the job ids
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @throws OozieClientException the oozie client exception
*/
public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException {
new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call();
}
/**
* Sla enable alert for bundle with coord name/id.
*
* @param bundleId the bundle id
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @param coords the coordinators
* @throws OozieClientException the oozie client exception
*/
public void slaEnableAlert(String bundleId, String actions, String dates, String coords)
throws OozieClientException {
new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call();
}
/**
* Sla disable alert.
*
* @param jobIds the job ids
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @throws OozieClientException the oozie client exception
*/
public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException {
new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call();
}
/**
* Sla disable alert for bundle with coord name/id.
*
* @param bundleId the bundle id
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @param coords the coordinators
* @throws OozieClientException the oozie client exception
*/
public void slaDisableAlert(String bundleId, String actions, String dates, String coords)
throws OozieClientException {
new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call();
}
/**
* Sla change definations.
* SLA change definition parameters can be [&lt;key&gt;=&lt;value&gt;,...&lt;key&gt;=&lt;value&gt;]
* Supported parameter key names are should-start, should-end and max-duration
* @param jobIds the job ids
* @param actions comma separated list of action ids or action id ranges.
* @param dates comma separated list of the nominal times
* @param newSlaParams the new sla params
* @throws OozieClientException the oozie client exception
*/
public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException {
new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call();
}
/**
* Sla change defination for bundle with coord name/id.
* SLA change definition parameters can be [&lt;key&gt;=&lt;value&gt;,...&lt;key&gt;=&lt;value&gt;]
* Supported parameter key names are should-start, should-end and max-duration
* @param bundleId the bundle id
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @param coords the coords
* @param newSlaParams the new sla params
* @throws OozieClientException the oozie client exception
*/
public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams)
throws OozieClientException {
new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call();
}
/**
* Sla change with new sla param as hasmap.
* Supported parameter key names are should-start, should-end and max-duration
* @param bundleId the bundle id
* @param actions comma separated list of action ids or action id ranges
* @param dates comma separated list of the nominal times
* @param coords the coords
* @param newSlaParams the new sla params
* @throws OozieClientException the oozie client exception
*/
public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams)
throws OozieClientException {
new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call();
}
/**
* Convert Map to string.
*
* @param map the map
* @return the string
*/
protected String mapToString(Map<String, String> map) {
StringBuilder sb = new StringBuilder();
for (Entry<String, String> e : map.entrySet()) {
sb.append(e.getKey()).append("=").append(e.getValue()).append(";");
}
return sb.toString();
}
private class UpdateSLA extends ClientCallable<Void> {
UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) {
super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
dates, RestConstants.COORDINATORS_PARAM, coords));
}
UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) {
super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams));
}
@Override
protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
System.out.println("Done");
}
else {
handleError(conn);
}
return null;
}
}
/**
* Print sla info about coordinator and workflow jobs and actions.
*
* @param start starting offset
* @param len number of results
* @param filter filters to use. Elements must be semicolon-separated name=value pairs
* @throws OozieClientException thrown if the sla info could not be retrieved
*/
public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
new SlaInfo(start, len, filter).call();
}
private class SlaInfo extends ClientCallable<Void> {
SlaInfo(int start, int len, String filter) {
super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
RestConstants.JOBS_FILTER_PARAM, filter));
}
@Override
protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
BufferedReader br = new BufferedReader(
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
else {
handleError(conn);
}
return null;
}
}
private class JobIdAction extends ClientCallable<String> {
JobIdAction(String externalId) {
super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return (String) json.get(JsonTags.JOB_ID);
}
else {
handleError(conn);
}
return null;
}
}
/**
* Return the workflow job Id for an external Id.
* <p>
* The external Id must have provided at job creation time.
*
* @param externalId external Id given at job creation time.
* @return the workflow job Id for an external Id, <code>null</code> if none.
* @throws OozieClientException thrown if the operation could not be done.
*/
public String getJobId(String externalId) throws OozieClientException {
return new JobIdAction(externalId).call();
}
private class SetSystemMode extends ClientCallable<Void> {
public SetSystemMode(SYSTEM_MODE status) {
super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
}
@Override
public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
handleError(conn);
}
return null;
}
}
/**
* Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
* command to change and view the safe mode status.
*
* @param status true to enable safe mode, false to disable safe mode.
* @throws OozieClientException if it fails to set the safe mode status.
*/
public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
new SetSystemMode(status).call();
}
private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
GetSystemMode() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
}
@Override
protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
}
else {
handleError(conn);
}
return SYSTEM_MODE.NORMAL;
}
}
/**
* Returns if Oozie is in safe mode or not.
*
* @return true if safe mode is ON<br>
* false if safe mode is OFF
* @throws OozieClientException throw if it could not obtain the safe mode status.
*/
/*
* public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
*/
public SYSTEM_MODE getSystemMode() throws OozieClientException {
return new GetSystemMode().call();
}
public String updateShareLib() throws OozieClientException {
return new UpdateSharelib().call();
}
public String listShareLib(String sharelibKey) throws OozieClientException {
return new ListShareLib(sharelibKey).call();
}
private class GetBuildVersion extends ClientCallable<String> {
GetBuildVersion() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return json.get(JsonTags.BUILD_INFO).toString();
}
else {
handleError(conn);
}
return null;
}
}
private class ValidateXML extends ClientCallable<String> {
String file = null;
ValidateXML(String file, String user) {
super("POST", WS_PROTOCOL_VERSION, RestConstants.VALIDATE, "",
prepareParams(RestConstants.FILE_PARAM, file, RestConstants.USER_PARAM, user));
this.file = file;
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if (file.startsWith("/")) {
FileInputStream fi = new FileInputStream(new File(file));
byte[] buffer = new byte[1024];
int n;
while (-1 != (n = fi.read(buffer))) {
conn.getOutputStream().write(buffer, 0, n);
}
}
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return (String) json.get(JsonTags.VALIDATE);
}
else if ((conn.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
return null;
}
else {
handleError(conn);
}
return null;
}
}
private class SubmitXML extends ClientCallable<String> {
SubmitXML(final String xml, final String user) {
super("POST",
WS_PROTOCOL_VERSION,
RestConstants.ACTION_PARAM,
RestConstants.JOB_ACTION_SUBMIT,
prepareParams());
}
@Override
protected String call(final HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
return null;
}
}
private class UpdateSharelib extends ClientCallable<String> {
UpdateSharelib() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams(
RestConstants.ALL_SERVER_REQUEST, "true"));
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
StringBuilder bf = new StringBuilder();
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
Object sharelib = JSONValue.parse(reader);
bf.append("[ShareLib update status]").append(System.getProperty("line.separator"));
if (sharelib instanceof JSONArray) {
for (Object o : ((JSONArray) sharelib)) {
JSONObject obj = (JSONObject) ((JSONObject) o).get(JsonTags.SHARELIB_LIB_UPDATE);
for (Object key : obj.keySet()) {
bf.append("\t").append(key).append(" = ").append(obj.get(key))
.append(System.getProperty("line.separator"));
}
bf.append(System.getProperty("line.separator"));
}
}
else{
JSONObject obj = (JSONObject) ((JSONObject) sharelib).get(JsonTags.SHARELIB_LIB_UPDATE);
for (Object key : obj.keySet()) {
bf.append("\t").append(key).append(" = ").append(obj.get(key))
.append(System.getProperty("line.separator"));
}
bf.append(System.getProperty("line.separator"));
}
return bf.toString();
}
else {
handleError(conn);
}
return null;
}
}
private class ListShareLib extends ClientCallable<String> {
ListShareLib(String sharelibKey) {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_LIST_SHARELIB, prepareParams(
RestConstants.SHARE_LIB_REQUEST_KEY, sharelibKey));
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
StringBuilder bf = new StringBuilder();
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
Object sharelib = json.get(JsonTags.SHARELIB_LIB);
bf.append("[Available ShareLib]").append(System.getProperty("line.separator"));
if (sharelib instanceof JSONArray) {
for (Object o : ((JSONArray) sharelib)) {
JSONObject obj = (JSONObject) o;
bf.append(obj.get(JsonTags.SHARELIB_LIB_NAME))
.append(System.getProperty("line.separator"));
if (obj.get(JsonTags.SHARELIB_LIB_FILES) != null) {
for (Object file : ((JSONArray) obj.get(JsonTags.SHARELIB_LIB_FILES))) {
bf.append("\t").append(file).append(System.getProperty("line.separator"));
}
}
}
return bf.toString();
}
}
else {
handleError(conn);
}
return null;
}
}
public String purgeCommand(String purgeOptions) throws OozieClientException {
String workflowAge = "";
String coordAge = "";
String bundleAge = "";
String purgeLimit = "";
String oldCoordAction = "";
if (purgeOptions != null) {
String options[] = purgeOptions.split(";");
for (String option : options) {
String pair[] = option.split("=");
if (pair.length < 2) {
throw new OozieClientException(OozieClientException.INVALID_INPUT,
"Invalid purge option pair [" + option + "] specified.");
}
String key = pair[0].toLowerCase();
String value = pair[1];
switch (key) {
case "wf":
workflowAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
break;
case "coord":
coordAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
break;
case "bundle":
bundleAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
break;
case "limit":
purgeLimit = String.valueOf(ValidationUtil.parsePositiveInteger(value));
break;
case "oldcoordaction":
oldCoordAction = value;
break;
default:
throw new OozieClientException(OozieClientException.INVALID_INPUT,
"Invalid purge option [" + key + "] specified.");
}
}
}
PurgeCommand purgeCommand = new PurgeCommand(workflowAge, coordAge, bundleAge, purgeLimit, oldCoordAction);
return purgeCommand.call();
}
private class PurgeCommand extends ClientCallable<String> {
PurgeCommand(String workflowAge, String coordAge, String bundleAge, String purgeLimit, String oldCoordAction) {
super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_PURGE, prepareParams(
RestConstants.PURGE_WF_AGE, workflowAge,
RestConstants.PURGE_COORD_AGE, coordAge,
RestConstants.PURGE_BUNDLE_AGE, bundleAge,
RestConstants.PURGE_LIMIT, purgeLimit,
RestConstants.PURGE_OLD_COORD_ACTION, oldCoordAction
));
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject jsonObject = (JSONObject) JSONValue.parse(reader);
Object msg = jsonObject.get(JsonTags.PURGE);
return msg.toString();
} else {
handleError(conn);
}
return null;
}
}
/**
* Return the Oozie server build version.
*
* @return the Oozie server build version.
* @throws OozieClientException throw if the server build version could not be retrieved.
*/
public String getServerBuildVersion() throws OozieClientException {
return new GetBuildVersion().call();
}
/**
* Return the Oozie client build version.
*
* @return the Oozie client build version.
*/
public String getClientBuildVersion() {
return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
}
/**
* Return the workflow application is valid.
*
* @param file local file or hdfs file.
* @return the workflow application is valid.
* @throws OozieClientException throw if it the workflow application's validation could not be retrieved.
*/
public String validateXML(String file) throws OozieClientException {
String fileName = file;
if (file.startsWith("file://")) {
fileName = file.substring(7, file.length());
}
if (!fileName.contains("://")) {
File f = new File(fileName);
if (!f.isFile()) {
throw new OozieClientException("File error", "File does not exist : " + f.getAbsolutePath());
}
fileName = f.getAbsolutePath();
}
return new ValidateXML(fileName, getUserName()).call();
}
private String getUserName() {
String userName = USER_NAME_TL.get();
if (userName == null) {
userName = System.getProperty("user.name");
}
return userName;
}
/**
* Return the info of the coordinator jobs that match the filter.
*
* @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
* @param start jobs offset, base 1.
* @param len number of jobs to return.
* @return a list with the coordinator jobs info
* @throws OozieClientException thrown if the jobs info could not be retrieved.
*/
public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
return new CoordJobsStatus(filter, start, len).call();
}
/**
* Return the info of the bundle jobs that match the filter.
*
* @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
* @param start jobs offset, base 1.
* @param len number of jobs to return.
* @return a list with the bundle jobs info
* @throws OozieClientException thrown if the jobs info could not be retrieved.
*/
public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
return new BundleJobsStatus(filter, start, len).call();
}
public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
return new BulkResponseStatus(filter, start, len).call();
}
/**
* Poll a job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID) and return when it has reached a
* terminal state.
* (i.e. FAILED, KILLED, SUCCEEDED)
*
* @param id The Job ID
* @param timeout timeout in minutes (negative values indicate no timeout)
* @param interval polling interval in minutes (must be positive)
* @param verbose if true, the current status will be printed out at each poll; if false, no output
* @throws OozieClientException thrown if the job's status could not be retrieved
*/
public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException {
notEmpty("id", id);
if (interval < 1) {
throw new IllegalArgumentException("interval must be a positive integer");
}
boolean noTimeout = (timeout < 1);
long endTime = System.currentTimeMillis() + timeout * 60 * 1000;
interval *= 60 * 1000;
final Set<String> completedStatuses;
if (id.endsWith("-W")) {
completedStatuses = COMPLETED_WF_STATUSES;
} else if (id.endsWith("-C")) {
completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES;
} else if (id.endsWith("-B")) {
completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES;
} else if (id.contains("-C@")) {
completedStatuses = COMPLETED_COORD_ACTION_STATUSES;
} else {
throw new IllegalArgumentException("invalid job type");
}
String status = getStatus(id);
if (verbose) {
System.out.println(status);
}
while(!completedStatuses.contains(status) && (noTimeout || System.currentTimeMillis() <= endTime)) {
try {
Thread.sleep(interval);
} catch (InterruptedException ie) {
// ignore
}
status = getStatus(id);
if (verbose) {
System.out.println(status);
}
}
}
/**
* Gets the status for a particular job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID).
*
* @param jobId given jobId
* @return the status
* @throws OozieClientException thrown if the status could not be retrieved
*/
public String getStatus(String jobId) throws OozieClientException {
return new Status(jobId).call();
}
private class Status extends ClientCallable<String> {
Status(String jobId) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
RestConstants.JOB_SHOW_STATUS));
}
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return (String) json.get(JsonTags.STATUS);
}
else {
handleError(conn);
}
return null;
}
}
private class GetQueueDump extends ClientCallable<List<String>> {
GetQueueDump() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
}
@Override
protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() != HttpURLConnection.HTTP_OK)) {
handleError(conn);
return null;
}
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
List<String> queueDumpMessages = new ArrayList<>();
addSeparator(queueDumpMessages);
addQueueMessages(json,
queueDumpMessages,
JsonTags.QUEUE_DUMP,
JsonTags.CALLABLE_DUMP,
"[Server Queue Dump]:",
"The queue dump is empty, nothing to display.");
addSeparator(queueDumpMessages);
addQueueMessages(json,
queueDumpMessages,
JsonTags.UNIQUE_MAP_DUMP,
JsonTags.UNIQUE_ENTRY_DUMP,
"[Server Uniqueness Map Dump]:",
"The uniqueness map dump is empty, nothing to display.");
addSeparator(queueDumpMessages);
return queueDumpMessages;
}
private void addQueueMessages(JSONObject json,
List<String> queueMessages,
String queueName,
String queueValue,
String headerMessage,
String emptyMessage) {
JSONArray queueDumpArray = (JSONArray) json.get(queueName);
queueMessages.add(headerMessage);
for (Object o : queueDumpArray) {
JSONObject entry = (JSONObject) o;
if (entry.get(queueValue) != null) {
String value = (String) entry.get(queueValue);
queueMessages.add(value);
}
}
if (queueDumpArray.isEmpty()) {
queueMessages.add(emptyMessage);
}
}
private void addSeparator(List<String> list) {
list.add("******************************************");
}
}
/**
* Return the Oozie queue's commands' dump
*
* @return the list of strings of callable identification in queue
* @throws OozieClientException throw if it the queue dump could not be retrieved.
*/
public List<String> getQueueDump() throws OozieClientException {
return new GetQueueDump().call();
}
private class GetAvailableOozieServers extends MapClientCallable {
GetAvailableOozieServers() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_AVAILABLE_OOZIE_SERVERS_RESOURCE, prepareParams());
}
}
/**
* Return the list of available Oozie servers.
*
* @return the list of available Oozie servers.
* @throws OozieClientException throw if it the list of available Oozie servers could not be retrieved.
*/
public Map<String, String> getAvailableOozieServers() throws OozieClientException {
return new GetAvailableOozieServers().call();
}
private class GetServerConfiguration extends MapClientCallable {
GetServerConfiguration() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_CONFIG_RESOURCE, prepareParams());
}
}
/**
* Return the Oozie system configuration.
*
* @return the Oozie system configuration.
* @throws OozieClientException throw if the system configuration could not be retrieved.
*/
public Map<String, String> getServerConfiguration() throws OozieClientException {
return new GetServerConfiguration().call();
}
private class GetJavaSystemProperties extends MapClientCallable {
GetJavaSystemProperties() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE, prepareParams());
}
}
/**
* Return the Oozie Java system properties.
*
* @return the Oozie Java system properties.
* @throws OozieClientException throw if the system properties could not be retrieved.
*/
public Map<String, String> getJavaSystemProperties() throws OozieClientException {
return new GetJavaSystemProperties().call();
}
private class GetOSEnv extends MapClientCallable {
GetOSEnv() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_OS_ENV_RESOURCE, prepareParams());
}
}
/**
* Return the Oozie system OS environment.
*
* @return the Oozie system OS environment.
* @throws OozieClientException throw if the system OS environment could not be retrieved.
*/
public Map<String, String> getOSEnv() throws OozieClientException {
return new GetOSEnv().call();
}
private class GetMetrics extends ClientCallable<Metrics> {
GetMetrics() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_METRICS_RESOURCE, prepareParams());
}
@Override
protected Metrics call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return new Metrics(json);
}
else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) {
// Use Instrumentation endpoint
return null;
}
else {
handleError(conn);
}
return null;
}
}
public class Metrics {
private Map<String, Long> counters;
private Map<String, Object> gauges;
private Map<String, Timer> timers;
private Map<String, Histogram> histograms;
@SuppressWarnings("unchecked")
public Metrics(JSONObject json) {
JSONObject jCounters = (JSONObject) json.get("counters");
counters = new HashMap<>(jCounters.size());
for (Object entO : jCounters.entrySet()) {
Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
counters.put(ent.getKey(), (Long)ent.getValue().get("count"));
}
JSONObject jGuages = (JSONObject) json.get("gauges");
gauges = new HashMap<>(jGuages.size());
for (Object entO : jGuages.entrySet()) {
Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
gauges.put(ent.getKey(), ent.getValue().get("value"));
}
JSONObject jTimers = (JSONObject) json.get("timers");
timers = new HashMap<>(jTimers.size());
for (Object entO : jTimers.entrySet()) {
Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
timers.put(ent.getKey(), new Timer(ent.getValue()));
}
JSONObject jHistograms = (JSONObject) json.get("histograms");
histograms = new HashMap<>(jHistograms.size());
for (Object entO : jHistograms.entrySet()) {
Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
histograms.put(ent.getKey(), new Histogram(ent.getValue()));
}
}
public Map<String, Long> getCounters() {
return counters;
}
public Map<String, Object> getGauges() {
return gauges;
}
public Map<String, Timer> getTimers() {
return timers;
}
public Map<String, Histogram> getHistograms() {
return histograms;
}
public class Timer extends Histogram {
private double m15Rate;
private double m5Rate;
private double m1Rate;
private double meanRate;
private String durationUnits;
private String rateUnits;
public Timer(JSONObject json) {
super(json);
m15Rate = Double.valueOf(json.get("m15_rate").toString());
m5Rate = Double.valueOf(json.get("m5_rate").toString());
m1Rate = Double.valueOf(json.get("m1_rate").toString());
meanRate = Double.valueOf(json.get("mean_rate").toString());
durationUnits = json.get("duration_units").toString();
rateUnits = json.get("rate_units").toString();
}
public double get15MinuteRate() {
return m15Rate;
}
public double get5MinuteRate() {
return m5Rate;
}
public double get1MinuteRate() {
return m1Rate;
}
public double getMeanRate() {
return meanRate;
}
public String getDurationUnits() {
return durationUnits;
}
public String getRateUnits() {
return rateUnits;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
sb.append("\n\t15 minute rate : ").append(m15Rate);
sb.append("\n\t5 minute rate : ").append(m5Rate);
sb.append("\n\t1 minute rate : ").append(m15Rate);
sb.append("\n\tmean rate : ").append(meanRate);
sb.append("\n\tduration units : ").append(durationUnits);
sb.append("\n\trate units : ").append(rateUnits);
return sb.toString();
}
}
public class Histogram {
private double p999;
private double p99;
private double p98;
private double p95;
private double p75;
private double p50;
private double mean;
private double min;
private double max;
private double stdDev;
private long count;
public Histogram(JSONObject json) {
p999 = Double.valueOf(json.get("p999").toString());
p99 = Double.valueOf(json.get("p99").toString());
p98 = Double.valueOf(json.get("p98").toString());
p95 = Double.valueOf(json.get("p95").toString());
p75 = Double.valueOf(json.get("p75").toString());
p50 = Double.valueOf(json.get("p50").toString());
mean = Double.valueOf(json.get("mean").toString());
min = Double.valueOf(json.get("min").toString());
max = Double.valueOf(json.get("max").toString());
stdDev = Double.valueOf(json.get("stddev").toString());
count = Long.valueOf(json.get("count").toString());
}
public double get999thPercentile() {
return p999;
}
public double get99thPercentile() {
return p99;
}
public double get98thPercentile() {
return p98;
}
public double get95thPercentile() {
return p95;
}
public double get75thPercentile() {
return p75;
}
public double get50thPercentile() {
return p50;
}
public double getMean() {
return mean;
}
public double getMin() {
return min;
}
public double getMax() {
return max;
}
public double getStandardDeviation() {
return stdDev;
}
public long getCount() {
return count;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\t999th percentile : ").append(p999);
sb.append("\n\t99th percentile : ").append(p99);
sb.append("\n\t98th percentile : ").append(p98);
sb.append("\n\t95th percentile : ").append(p95);
sb.append("\n\t75th percentile : ").append(p75);
sb.append("\n\t50th percentile : ").append(p50);
sb.append("\n\tmean : ").append(mean);
sb.append("\n\tmax : ").append(max);
sb.append("\n\tmin : ").append(min);
sb.append("\n\tcount : ").append(count);
sb.append("\n\tstandard deviation : ").append(stdDev);
return sb.toString();
}
}
}
/**
* Return the Oozie metrics. If null is returned, then try {@link #getInstrumentation()}.
*
* @return the Oozie metrics or null.
* @throws OozieClientException throw if the metrics could not be retrieved.
*/
public Metrics getMetrics() throws OozieClientException {
return new GetMetrics().call();
}
private class GetInstrumentation extends ClientCallable<Instrumentation> {
GetInstrumentation() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_INSTRUMENTATION_RESOURCE, prepareParams());
}
@Override
protected Instrumentation call(HttpURLConnection conn) throws IOException, OozieClientException {
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
JSONObject json = (JSONObject) JSONValue.parse(reader);
return new Instrumentation(json);
}
else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) {
// Use Metrics endpoint
return null;
}
else {
handleError(conn);
}
return null;
}
}
public class Instrumentation {
private Map<String, Long> counters;
private Map<String, Object> variables;
private Map<String, Double> samplers;
private Map<String, Timer> timers;
public Instrumentation(JSONObject json) {
JSONArray jCounters = (JSONArray) json.get("counters");
counters = new HashMap<>(jCounters.size());
for (Object groupO : jCounters) {
JSONObject group = (JSONObject) groupO;
String groupName = group.get("group").toString() + ".";
JSONArray data = (JSONArray) group.get("data");
for (Object datO : data) {
JSONObject dat = (JSONObject) datO;
counters.put(groupName + dat.get("name").toString(), Long.valueOf(dat.get("value").toString()));
}
}
JSONArray jVariables = (JSONArray) json.get("variables");
variables = new HashMap<>(jVariables.size());
for (Object groupO : jVariables) {
JSONObject group = (JSONObject) groupO;
String groupName = group.get("group").toString() + ".";
JSONArray data = (JSONArray) group.get("data");
for (Object datO : data) {
JSONObject dat = (JSONObject) datO;
variables.put(groupName + dat.get("name").toString(), dat.get("value"));
}
}
JSONArray jSamplers = (JSONArray) json.get("samplers");
samplers = new HashMap<>(jSamplers.size());
for (Object groupO : jSamplers) {
JSONObject group = (JSONObject) groupO;
String groupName = group.get("group").toString() + ".";
JSONArray data = (JSONArray) group.get("data");
for (Object datO : data) {
JSONObject dat = (JSONObject) datO;
samplers.put(groupName + dat.get("name").toString(), Double.valueOf(dat.get("value").toString()));
}
}
JSONArray jTimers = (JSONArray) json.get("timers");
timers = new HashMap<>(jTimers.size());
for (Object groupO : jTimers) {
JSONObject group = (JSONObject) groupO;
String groupName = group.get("group").toString() + ".";
JSONArray data = (JSONArray) group.get("data");
for (Object datO : data) {
JSONObject dat = (JSONObject) datO;
timers.put(groupName + dat.get("name").toString(), new Timer(dat));
}
}
}
public class Timer {
private double ownTimeStdDev;
private long ownTimeAvg;
private long ownMaxTime;
private long ownMinTime;
private double totalTimeStdDev;
private long totalTimeAvg;
private long totalMaxTime;
private long totalMinTime;
private long ticks;
public Timer(JSONObject json) {
ownTimeStdDev = Double.valueOf(json.get("ownTimeStdDev").toString());
ownTimeAvg = Long.valueOf(json.get("ownTimeAvg").toString());
ownMaxTime = Long.valueOf(json.get("ownMaxTime").toString());
ownMinTime = Long.valueOf(json.get("ownMinTime").toString());
totalTimeStdDev = Double.valueOf(json.get("totalTimeStdDev").toString());
totalTimeAvg = Long.valueOf(json.get("totalTimeAvg").toString());
totalMaxTime = Long.valueOf(json.get("totalMaxTime").toString());
totalMinTime = Long.valueOf(json.get("totalMinTime").toString());
ticks = Long.valueOf(json.get("ticks").toString());
}
public double getOwnTimeStandardDeviation() {
return ownTimeStdDev;
}
public long getOwnTimeAverage() {
return ownTimeAvg;
}
public long getOwnMaxTime() {
return ownMaxTime;
}
public long getOwnMinTime() {
return ownMinTime;
}
public double getTotalTimeStandardDeviation() {
return totalTimeStdDev;
}
public long getTotalTimeAverage() {
return totalTimeAvg;
}
public long getTotalMaxTime() {
return totalMaxTime;
}
public long getTotalMinTime() {
return totalMinTime;
}
public long getTicks() {
return ticks;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\town time standard deviation : ").append(ownTimeStdDev);
sb.append("\n\town average time : ").append(ownTimeAvg);
sb.append("\n\town max time : ").append(ownMaxTime);
sb.append("\n\town min time : ").append(ownMinTime);
sb.append("\n\ttotal time standard deviation : ").append(totalTimeStdDev);
sb.append("\n\ttotal average time : ").append(totalTimeAvg);
sb.append("\n\ttotal max time : ").append(totalMaxTime);
sb.append("\n\ttotal min time : ").append(totalMinTime);
sb.append("\n\tticks : ").append(ticks);
return sb.toString();
}
}
public Map<String, Long> getCounters() {
return counters;
}
public Map<String, Object> getVariables() {
return variables;
}
public Map<String, Double> getSamplers() {
return samplers;
}
public Map<String, Timer> getTimers() {
return timers;
}
}
/**
* Return the Oozie instrumentation. If null is returned, then try {@link #getMetrics()}.
*
* @return the Oozie intstrumentation or null.
* @throws OozieClientException throw if the intstrumentation could not be retrieved.
*/
public Instrumentation getInstrumentation() throws OozieClientException {
return new GetInstrumentation().call();
}
/**
* Check if the string is not null or not empty.
*
* @param str the string to check
* @param name the name to present in the error message
* @return string
*/
public static String notEmpty(String str, String name) {
if (str == null) {
throw new IllegalArgumentException(name + " cannot be null");
}
if (str.length() == 0) {
throw new IllegalArgumentException(name + " cannot be empty");
}
return str;
}
/**
* Check if the object is not null.
*
* @param <T> the type of the object
* @param obj the object to check
* @param name the name to present in the error message
* @return string
*/
public static <T> T notNull(T obj, String name) {
if (obj == null) {
throw new IllegalArgumentException(name + " cannot be null");
}
return obj;
}
}