blob: e985a5f5adeee226cbab4c4f80a650ae739d6b60 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.Transient;
import org.apache.hadoop.io.Writable;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@Entity
@NamedQueries({
@NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber,"
+ " w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode,"
+ " w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies "
+ "= :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type "
+ "= :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId,"
+ " w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml,"
+ " w.statusStr = :status where w.id = :id"),
@NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml,"
+ " w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr "
+ "= :status where w.id = :id"),
// Query to update the action status, pending status and last modified time stamp of a Coordinator action
@NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr"
+ " =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
// Update query for InputCheck
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr "
+ "= :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies "
+ "= :missingDependencies where w.id = :id"),
// Update query for Push-based missing dependency check
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr "
+ "= :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies "
+ "= :pushMissingDependencies where w.id = :id"),
// Update query for Push-based missing dependency check
@NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies"
+ " = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
// Update query for Start
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status,"
+ " w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending "
+ "= :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set"
+ " w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
@NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml,"
+ " w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp "
+ "= :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf "
+ "= :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies "
+ "= :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
@NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId"
+ " = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"),
@NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id"
+ " IN (:actionId)"),
@NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id "
+ "and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = "select a.id from CoordinatorActionBean a where a.jobId"
+ " = :jobId"),
// Query used by XTestcase to setup tables
@NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
// Select query used only by test cases
@NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
// Select query used by SLAService on restart
@NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId,"
+ " a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by ActionInfo command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode,"
+ " a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp,"
+ " a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies "
+ "from CoordinatorActionBean a where a.id = :id"),
// Select Query used by Timeout and skip commands
@NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending,"
+ " a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by InputCheck command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.actionNumber, a.jobId, a.statusStr,"
+ " a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies,"
+ " a.pushMissingDependencies, a.timeOut, a.externalId from CoordinatorActionBean a where a.id = :id"),
// Select query used by CoordActionUpdate command
@NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending,"
+ " a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp "
+ "from CoordinatorActionBean a where a.externalId = :externalId"),
// Select query used by Check command
@NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId,"
+ " a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a"
+ " where a.id = :id"),
// Select query used by Start command
@NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf,"
+ " a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp "
+ "from CoordinatorActionBean a where a.id = :id"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr,"
+ " a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId "
+ "AND a.statusStr = 'READY' order by a.nominalTimestamp"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr,"
+ " a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId "
+ "AND a.statusStr = 'READY' order by a.nominalTimestamp desc"),
@NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId"
+ " = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"),
@NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a "
+ "where a.jobId = :jobId"),
@NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a"
+ " where a.jobId = :jobId AND a.statusStr = 'WAITING'"),
@NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a "
+ "where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' "
+ "OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"),
@NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a"
+ " where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"),
@NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId"
+ " = :jobId"),
// Query to retrieve Coordinator actions sorted by nominal time
@NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl,"
+ " a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp,"
+ " a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies,"
+ " a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
// Query to maintain backward compatibility for coord job info command
@NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) "
+ "from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
// Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId,"
+ " a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a"
+ " where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr "
+ "<> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"),
// Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId,"
+ " a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId "
+ "and a.statusStr = 'RUNNING'"),
// Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId,"
+ " a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId "
+ "and a.statusStr = 'SUSPENDED'"),
// Query to retrieve count of Coordinator actions which are pending
@NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId"
+ " = :jobId AND a.pending > 0"),
// Query to retrieve status of Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr, a.pending "
+ "from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
// Query to retrieve status of Coordinator actions
@NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
@NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a"
+ " where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
@NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a "
+ "where a.lastModifiedTimestamp >= :lastModifiedTime"),
//Used by coordinator store only
@NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a "
+ "where a.jobId = :jobId AND a.statusStr = 'RUNNING'"),
@NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr"
+ " = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
@NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr,"
+ " a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' "
+ "OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime "
+ "and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w"
+ " where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId,"
+ " a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr "
+ "= 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
// Select query used by rerun, requires almost all columns so select * is used
@NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a "
+ "where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' "
+ "OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime "
+ "AND a.nominalTimestamp <= :endTime"),
// Select query used by log
@NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId"
+ " = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr"
+ " = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
// Select query used by rerun, requires almost all columns so select * is used
@NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId "
+ "= :jobId AND a.nominalTimestamp = :nominalTime"),
@NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending,"
+ " a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr "
+ "= 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING' OR a.statusStr "
+ "= 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
@NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"),
@NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w"
+ " where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
@NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) "
+ "from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
@NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp,"
+ " a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids)"
+ " and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr "
+ "<> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"),
@NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp,"
+ " a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr "
+ "<> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' "
+ "AND a.statusStr <> 'IGNORED')")
})
@Table(name = "COORD_ACTIONS")
public class CoordinatorActionBean implements
Writable,CoordinatorAction,JsonBean {
@Id
private String id;
@Basic
@Index
@Column(name = "job_id")
private String jobId;
@Basic
@Index
@Column(name = "status")
private String statusStr = CoordinatorAction.Status.WAITING.toString();
@Basic
@Index
@Column(name = "nominal_time")
private java.sql.Timestamp nominalTimestamp = null;
@Basic
@Index
@Column(name = "last_modified_time")
private java.sql.Timestamp lastModifiedTimestamp = null;
@Basic
@Index
@Column(name = "created_time")
private java.sql.Timestamp createdTimestamp = null;
@Basic
@Index
@Column(name = "rerun_time")
private java.sql.Timestamp rerunTimestamp = null;
@Basic
@Index
@Column(name = "external_id")
private String externalId;
@Basic
@Column(name = "sla_xml")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob slaXml = null;
@Basic
@Column(name = "pending")
private int pending = 0;
@Basic
@Column(name = "job_type")
private String type;
@Basic
@Column(name = "action_number")
private int actionNumber;
@Basic
@Column(name = "created_conf")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob createdConf;
@Basic
@Column(name = "time_out")
private int timeOut = 0;
@Basic
@Column(name = "run_conf")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob runConf;
@Basic
@Column(name = "action_xml")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob actionXml;
@Basic
@Column(name = "missing_dependencies")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob missingDependencies;
@Basic
@Column(name = "push_missing_dependencies")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob pushMissingDependencies;
@Basic
@Column(name = "external_status")
private String externalStatus;
@Basic
@Column(name = "tracker_uri")
private String trackerUri;
@Basic
@Column(name = "console_url")
private String consoleUrl;
@Basic
@Column(name = "error_code")
private String errorCode;
@Basic
@Column(name = "error_message")
private String errorMessage;
@SuppressWarnings("unchecked")
public JSONObject toJSONObject() {
return toJSONObject("GMT");
}
@Transient
private CoordInputDependency coordPushInputDependency;
@Transient
private CoordInputDependency coordPullInputDependency;
public CoordinatorActionBean() {
}
/**
* Serialize the coordinator bean to a data output.
*
* @param dataOutput data output.
* @throws IOException thrown if the coordinator bean could not be
* serialized.
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeStr(dataOutput, getJobId());
WritableUtils.writeStr(dataOutput, getType());
WritableUtils.writeStr(dataOutput, getId());
WritableUtils.writeStr(dataOutput, getCreatedConf());
WritableUtils.writeStr(dataOutput, getStatus().toString());
dataOutput.writeInt(getActionNumber());
WritableUtils.writeStr(dataOutput, getRunConf());
WritableUtils.writeStr(dataOutput, getExternalStatus());
WritableUtils.writeStr(dataOutput, getTrackerUri());
WritableUtils.writeStr(dataOutput, getConsoleUrl());
WritableUtils.writeStr(dataOutput, getErrorCode());
WritableUtils.writeStr(dataOutput, getErrorMessage());
dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
}
/**
* Deserialize a coordinator bean from a data input.
*
* @param dataInput data input.
* @throws IOException thrown if the workflow bean could not be
* deserialized.
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
setJobId(WritableUtils.readStr(dataInput));
setType(WritableUtils.readStr(dataInput));
setId(WritableUtils.readStr(dataInput));
setCreatedConf(WritableUtils.readStr(dataInput));
setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
setActionNumber(dataInput.readInt());
setRunConf(WritableUtils.readStr(dataInput));
setExternalStatus(WritableUtils.readStr(dataInput));
setTrackerUri(WritableUtils.readStr(dataInput));
setConsoleUrl(WritableUtils.readStr(dataInput));
setErrorCode(WritableUtils.readStr(dataInput));
setErrorMessage(WritableUtils.readStr(dataInput));
long d = dataInput.readLong();
if (d != -1) {
setCreatedTime(new Date(d));
}
d = dataInput.readLong();
if (d != -1) {
setLastModifiedTime(new Date(d));
}
}
@Override
public String getJobId() {
return this.jobId;
}
public void setJobId(String id) {
this.jobId = id;
}
@Override
public Status getStatus() {
return Status.valueOf(statusStr);
}
/**
* Return the status in string
* @return statusStr
*/
public String getStatusStr() {
return statusStr;
}
public void setStatus(Status status) {
this.statusStr = status.toString();
}
public void setStatusStr(String statusStr) {
this.statusStr = statusStr;
}
public void setCreatedTime(Date createdTime) {
this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
}
public void setRerunTime(Date rerunTime) {
this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
}
public void setNominalTime(Date nominalTime) {
this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
}
public void setLastModifiedTime(Date lastModifiedTime) {
this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
}
public Date getCreatedTime() {
return DateUtils.toDate(createdTimestamp);
}
public Timestamp getCreatedTimestamp() {
return createdTimestamp;
}
public Date getRerunTime() {
return DateUtils.toDate(rerunTimestamp);
}
public Timestamp getRerunTimestamp() {
return rerunTimestamp;
}
@Override
public Date getLastModifiedTime() {
return DateUtils.toDate(lastModifiedTimestamp);
}
public Timestamp getLastModifiedTimestamp() {
return lastModifiedTimestamp;
}
@Override
public Date getNominalTime() {
return DateUtils.toDate(nominalTimestamp);
}
public Timestamp getNominalTimestamp() {
return nominalTimestamp;
}
@Override
public String getExternalId() {
return externalId;
}
public void setExternalId(String externalId) {
this.externalId = externalId;
}
public StringBlob getSlaXmlBlob() {
return slaXml;
}
public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
public String getSlaXml() {
return slaXml == null ? null : slaXml.getString();
}
public void setSlaXml(String slaXml) {
if (this.slaXml == null) {
this.slaXml = new StringBlob(slaXml);
}
else {
this.slaXml.setString(slaXml);
}
}
/**
* @return true if in terminal status
*/
public boolean isTerminalStatus() {
boolean isTerminal = true;
switch (getStatus()) {
case WAITING:
case READY:
case SUBMITTED:
case RUNNING:
case SUSPENDED:
isTerminal = false;
break;
default:
isTerminal = true;
break;
}
return isTerminal;
}
/**
* Return if the action is complete with failure.
*
* @return if the action is complete with failure.
*/
public boolean isTerminalWithFailure() {
boolean result = false;
switch (getStatus()) {
case FAILED:
case KILLED:
case TIMEDOUT:
result = true;
}
return result;
}
/**
* Set some actions are in progress for particular coordinator action.
*
* @param pending set pending to true
*/
public void setPending(int pending) {
this.pending = pending;
}
/**
* increment pending and return it
*
* @return pending
*/
public int incrementAndGetPending() {
this.pending++;
return pending;
}
/**
* decrement pending and return it
*
* @return pending
*/
public int decrementAndGetPending() {
this.pending = Math.max(this.pending - 1, 0);
return pending;
}
/**
* Get some actions are in progress for particular bundle action.
*
* @return pending
*/
public int getPending() {
return this.pending;
}
/**
* Return if the action is pending.
*
* @return if the action is pending.
*/
public boolean isPending() {
return pending > 0 ? true : false;
}
@Override
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public void setActionNumber(int actionNumber) {
this.actionNumber = actionNumber;
}
@Override
public int getActionNumber() {
return actionNumber;
}
@Override
public String getCreatedConf() {
return createdConf == null ? null : createdConf.getString();
}
public void setCreatedConf(String createdConf) {
if (this.createdConf == null) {
this.createdConf = new StringBlob(createdConf);
}
else {
this.createdConf.setString(createdConf);
}
}
public void setCreatedConfBlob(StringBlob createdConf) {
this.createdConf = createdConf;
}
public StringBlob getCreatedConfBlob() {
return createdConf;
}
public void setRunConf(String runConf) {
if (this.runConf == null) {
this.runConf = new StringBlob(runConf);
}
else {
this.runConf.setString(runConf);
}
}
@Override
public String getRunConf() {
return runConf == null ? null : runConf.getString();
}
public void setRunConfBlob(StringBlob runConf) {
this.runConf = runConf;
}
public StringBlob getRunConfBlob() {
return runConf;
}
public void setMissingDependencies(String missingDependencies) {
if (this.missingDependencies == null) {
this.missingDependencies = new StringBlob(missingDependencies);
}
else {
this.missingDependencies.setString(missingDependencies);
}
}
@Override
public String getMissingDependencies() {
return missingDependencies == null ? null : missingDependencies.getString();
}
public void setMissingDependenciesBlob(StringBlob missingDependencies) {
this.missingDependencies = missingDependencies;
}
public StringBlob getMissingDependenciesBlob() {
return missingDependencies;
}
@Override
public String getPushMissingDependencies() {
return pushMissingDependencies == null ? null : pushMissingDependencies.getString();
}
public void setPushMissingDependencies(String pushMissingDependencies) {
if (this.pushMissingDependencies == null) {
this.pushMissingDependencies = new StringBlob(pushMissingDependencies);
}
else {
this.pushMissingDependencies.setString(pushMissingDependencies);
}
}
public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) {
this.pushMissingDependencies = pushMissingDependencies;
}
public StringBlob getPushMissingDependenciesBlob() {
return pushMissingDependencies;
}
public String getExternalStatus() {
return externalStatus;
}
public void setExternalStatus(String externalStatus) {
this.externalStatus = externalStatus;
}
@Override
public String getTrackerUri() {
return trackerUri;
}
public void setTrackerUri(String trackerUri) {
this.trackerUri = trackerUri;
}
@Override
public String getConsoleUrl() {
return consoleUrl;
}
public void setConsoleUrl(String consoleUrl) {
this.consoleUrl = consoleUrl;
}
@Override
public String getErrorCode() {
return errorCode;
}
@Override
public String getErrorMessage() {
return errorMessage;
}
public void setErrorInfo(String errorCode, String errorMessage) {
this.errorCode = errorCode;
this.errorMessage = errorMessage;
}
public String getActionXml() {
return actionXml == null ? null : actionXml.getString();
}
public void setActionXml(String actionXml) {
if (this.actionXml == null) {
this.actionXml = new StringBlob(actionXml);
}
else {
this.actionXml.setString(actionXml);
}
}
public void setActionXmlBlob(StringBlob actionXml) {
this.actionXml = actionXml;
}
public StringBlob getActionXmlBlob() {
return actionXml;
}
@Override
public String toString() {
return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
getId(), getStatus());
}
public int getTimeOut() {
return timeOut;
}
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
@SuppressWarnings("unchecked")
public JSONObject toJSONObject(String timeZoneId) {
JSONObject json = new JSONObject();
json.put(JsonTags.COORDINATOR_ACTION_ID, id);
json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME,
JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
// json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
// .formatDateRfc822(endTime), timeZoneId);
json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies());
json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies());
json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode);
json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage);
json.put(JsonTags.TO_STRING, toString());
return json;
}
/**
* Convert a nodes list into a JSONArray.
*
* @param actions nodes list.
* @param timeZoneId time zone to use for dates in the JSON array.
* @return the corresponding JSON array.
*/
@SuppressWarnings("unchecked")
public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) {
JSONArray array = new JSONArray();
for (CoordinatorActionBean action : actions) {
array.add(action.toJSONObject(timeZoneId));
}
return array;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CoordinatorActionBean other = (CoordinatorActionBean) obj;
if (id == null) {
if (other.id != null) {
return false;
}
}
else if (!id.equals(other.id)) {
return false;
}
return true;
}
public CoordInputDependency getPullInputDependencies() {
if (coordPullInputDependency == null) {
coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies);
}
return coordPullInputDependency;
}
public CoordInputDependency getPushInputDependencies() {
if (coordPushInputDependency == null) {
coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies);
}
return coordPushInputDependency;
}
public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) {
this.coordPullInputDependency = coordPullInputDependency;
}
public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) {
this.coordPushInputDependency = coordPushInputDependency;
}
}