blob: 10981d190155d4353633cdb868d08cd56669c74b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Properties;
import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Lob;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Transient;
import org.apache.hadoop.io.Writable;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.rest.JsonWorkflowAction;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
/**
* Bean that contains all the information to start an action for a workflow node.
*/
@Entity
@NamedQueries({
@NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
@NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
@NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
@NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
@NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
@NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
@NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
@NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
@NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
@NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
@NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
@Basic
@Index
@Column(name = "wf_id")
private String wfId = null;
@Basic
@Index
@Column(name = "status")
private String status = WorkflowAction.Status.PREP.toString();
@Basic
@Column(name = "last_check_time")
private java.sql.Timestamp lastCheckTimestamp;
@Basic
@Column(name = "end_time")
private java.sql.Timestamp endTimestamp = null;
@Basic
@Column(name = "start_time")
private java.sql.Timestamp startTimestamp = null;
@Basic
@Column(name = "execution_path", length = 1024)
private String executionPath = null;
@Basic
@Column(name = "pending")
private int pending = 0;
// @Temporal(TemporalType.TIME)
// @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
@Basic
@Index
@Column(name = "pending_age")
private java.sql.Timestamp pendingAgeTimestamp = null;
@Basic
@Column(name = "signal_value")
private String signalValue = null;
@Basic
@Column(name = "log_token")
private String logToken = null;
@Transient
private Date pendingAge;
@Column(name = "sla_xml")
@Lob
private String slaXml = null;
/**
* Default constructor.
*/
public WorkflowActionBean() {
}
/**
* Serialize the action bean to a data output.
*
* @param dataOutput data output.
* @throws IOException thrown if the action bean could not be serialized.
*/
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeStr(dataOutput, getId());
WritableUtils.writeStr(dataOutput, getName());
WritableUtils.writeStr(dataOutput, getCred());
WritableUtils.writeStr(dataOutput, getType());
WritableUtils.writeStr(dataOutput, getConf());
WritableUtils.writeStr(dataOutput, getStatusStr());
dataOutput.writeInt(getRetries());
dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
WritableUtils.writeStr(dataOutput, getTransition());
WritableUtils.writeStr(dataOutput, getData());
WritableUtils.writeStr(dataOutput, getExternalId());
WritableUtils.writeStr(dataOutput, getExternalStatus());
WritableUtils.writeStr(dataOutput, getTrackerUri());
WritableUtils.writeStr(dataOutput, getConsoleUrl());
WritableUtils.writeStr(dataOutput, getErrorCode());
WritableUtils.writeStr(dataOutput, getErrorMessage());
WritableUtils.writeStr(dataOutput, wfId);
WritableUtils.writeStr(dataOutput, executionPath);
dataOutput.writeInt(pending);
dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
WritableUtils.writeStr(dataOutput, signalValue);
WritableUtils.writeStr(dataOutput, logToken);
dataOutput.writeInt(getUserRetryCount());
dataOutput.writeInt(getUserRetryInterval());
dataOutput.writeInt(getUserRetryMax());
}
/**
* Deserialize an action bean from a data input.
*
* @param dataInput data input.
* @throws IOException thrown if the action bean could not be deserialized.
*/
public void readFields(DataInput dataInput) throws IOException {
setId(WritableUtils.readStr(dataInput));
setName(WritableUtils.readStr(dataInput));
setCred(WritableUtils.readStr(dataInput));
setType(WritableUtils.readStr(dataInput));
setConf(WritableUtils.readStr(dataInput));
setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
setRetries(dataInput.readInt());
long d = dataInput.readLong();
if (d != -1) {
setStartTime(new Date(d));
}
d = dataInput.readLong();
if (d != -1) {
setEndTime(new Date(d));
}
d = dataInput.readLong();
if (d != -1) {
setLastCheckTime(new Date(d));
}
setTransition(WritableUtils.readStr(dataInput));
setData(WritableUtils.readStr(dataInput));
setExternalId(WritableUtils.readStr(dataInput));
setExternalStatus(WritableUtils.readStr(dataInput));
setTrackerUri(WritableUtils.readStr(dataInput));
setConsoleUrl(WritableUtils.readStr(dataInput));
setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
wfId = WritableUtils.readStr(dataInput);
executionPath = WritableUtils.readStr(dataInput);
pending = dataInput.readInt();
d = dataInput.readLong();
if (d != -1) {
pendingAge = new Date(d);
pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
}
signalValue = WritableUtils.readStr(dataInput);
logToken = WritableUtils.readStr(dataInput);
setUserRetryCount(dataInput.readInt());
setUserRetryInterval(dataInput.readInt());
setUserRetryMax(dataInput.readInt());
}
/**
* Return if the action execution is complete.
*
* @return if the action start is complete.
*/
public boolean isExecutionComplete() {
return getStatus() == WorkflowAction.Status.DONE;
}
/**
* Return if the action is START_RETRY or START_MANUAL or END_RETRY or
* END_MANUAL.
*
* @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
* END_MANUAL
*/
public boolean isRetryOrManual() {
return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
|| getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
}
/**
* Return true if the action is USER_RETRY
*
* @return boolean true if status is USER_RETRY
*/
public boolean isUserRetry() {
return (getStatus() == WorkflowAction.Status.USER_RETRY);
}
/**
* Return if the action is complete.
*
* @return if the action is complete.
*/
public boolean isComplete() {
return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
getStatus() == WorkflowAction.Status.ERROR;
}
/**
* Set the action pending flag to true.
*/
public void setPendingOnly() {
pending = 1;
}
/**
* Set the action as pending and the current time as pending.
*/
public void setPending() {
pending = 1;
pendingAge = new Date();
pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
}
/**
* Set a time when the action will be pending, normally a time in the future.
*
* @param pendingAge the time when the action will be pending.
*/
public void setPendingAge(Date pendingAge) {
this.pendingAge = pendingAge;
this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
}
/**
* Return the pending age of the action.
*
* @return the pending age of the action, <code>null</code> if the action is not pending.
*/
public Date getPendingAge() {
return DateUtils.toDate(pendingAgeTimestamp);
}
/**
* Return if the action is pending.
*
* @return if the action is pending.
*/
public boolean isPending() {
return pending == 1 ? true : false;
}
/**
* Removes the pending flag and pendingAge from the action.
*/
public void resetPending() {
pending = 0;
pendingAge = null;
pendingAgeTimestamp = null;
}
/**
* Removes the pending flag from the action.
*/
public void resetPendingOnly() {
pending = 0;
}
/**
* Increments the number of retries for the action.
*/
public void incRetries() {
setRetries(getRetries() + 1);
}
/**
* Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
*
* @param externalId external ID for the action.
* @param trackerUri tracker URI for the action.
* @param consoleUrl console URL for the action.
*/
public void setStartData(String externalId, String trackerUri, String consoleUrl) {
setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
Date now = new Date();
setStartTime(now);
setLastCheckTime(now);
setStatus(Status.RUNNING);
}
/**
* Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
*
* @param externalStatus action external end status.
* @param actionData action output data, <code>null</code> if there is no action output data.
*/
public void setExecutionData(String externalStatus, Properties actionData) {
setStatus(Status.DONE);
setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
if (actionData != null) {
setData(PropertiesUtils.propertiesToString(actionData));
}
}
/**
* Set the completion information for an action end.
*
* @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
* Action.Status#KILLED}
* @param signalValue the signal value. In most cases, the value should be OK or ERROR.
*/
public void setEndData(Status status, String signalValue) {
if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
+ status.toString() + "]");
}
if (status == Status.OK) {
setErrorInfo(null, null);
}
setStatus(status);
setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
}
/**
* Return the job Id.
*
* @return the job Id.
*/
public String getJobId() {
return wfId;
}
/**
* Return the job Id.
*
* @return the job Id.
*/
public String getWfId() {
return wfId;
}
/**
* Set the job id.
*
* @param id jobId;
*/
public void setJobId(String id) {
this.wfId = id;
}
public String getSlaXml() {
return slaXml;
}
public void setSlaXml(String slaXml) {
this.slaXml = slaXml;
}
@Override
public void setStatus(Status val) {
this.status = val.toString();
super.setStatus(val);
}
public String getStatusStr() {
return status;
}
@Override
public Status getStatus() {
return Status.valueOf(this.status);
}
/**
* Return the node execution path.
*
* @return the node execution path.
*/
public String getExecutionPath() {
return executionPath;
}
/**
* Set the node execution path.
*
* @param executionPath the node execution path.
*/
public void setExecutionPath(String executionPath) {
this.executionPath = executionPath;
}
/**
* Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
* OK or ERROR.
*
* @return the action signal value.
*/
public String getSignalValue() {
return signalValue;
}
/**
* Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
* or ERROR.
*
* @param signalValue the action signal value.
*/
public void setSignalValue(String signalValue) {
this.signalValue = signalValue;
}
/**
* Return the job log token.
*
* @return the job log token.
*/
public String getLogToken() {
return logToken;
}
/**
* Set the job log token.
*
* @param logToken the job log token.
*/
public void setLogToken(String logToken) {
this.logToken = logToken;
}
/**
* Return the action last check time
*
* @return the last check time
*/
public Date getLastCheckTime() {
return DateUtils.toDate(lastCheckTimestamp);
}
/**
* Return the action last check time
*
* @return the last check time
*/
public Timestamp getLastCheckTimestamp() {
return lastCheckTimestamp;
}
/**
* Return the action last check time
*
* @return the last check time
*/
public Timestamp getStartTimestamp() {
return startTimestamp;
}
/**
* Return the action last check time
*
* @return the last check time
*/
public Timestamp getEndTimestamp() {
return endTimestamp;
}
/**
* Return the action last check time
*
* @return the last check time
*/
public Timestamp getPendingAgeTimestamp() {
return pendingAgeTimestamp;
}
/**
* Sets the action last check time
*
* @param lastCheckTime the last check time to set.
*/
public void setLastCheckTime(Date lastCheckTime) {
this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
}
public boolean getPending() {
return this.pending == 1 ? true : false;
}
@Override
public Date getStartTime() {
return DateUtils.toDate(startTimestamp);
}
@Override
public void setStartTime(Date startTime) {
super.setStartTime(startTime);
this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
}
@Override
public Date getEndTime() {
return DateUtils.toDate(endTimestamp);
}
@Override
public void setEndTime(Date endTime) {
super.setEndTime(endTime);
this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
}
}