| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.oozie; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.sql.Timestamp; |
| import java.util.Date; |
| |
| import javax.persistence.Basic; |
| import javax.persistence.Column; |
| import javax.persistence.ColumnResult; |
| import javax.persistence.Entity; |
| import javax.persistence.Lob; |
| import javax.persistence.NamedNativeQueries; |
| import javax.persistence.NamedNativeQuery; |
| import javax.persistence.NamedQueries; |
| import javax.persistence.NamedQuery; |
| import javax.persistence.SqlResultSetMapping; |
| |
| import org.apache.hadoop.io.Writable; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.rest.JsonCoordinatorAction; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.WritableUtils; |
| import org.apache.openjpa.persistence.jdbc.Index; |
| |
| @SqlResultSetMapping( |
| name = "CoordActionJobIdLmt", |
| columns = {@ColumnResult(name = "job_id"), |
| @ColumnResult(name = "min_lmt")}) |
| |
| @Entity |
| @NamedQueries({ |
| |
| @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"), |
| |
| @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"), |
| // Query to update the action status, pending status and last modified time stamp of a Coordinator action |
| @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), |
| |
| @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), |
| |
| @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"), |
| |
| @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), |
| |
| @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"), |
| |
| @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"), |
| |
| @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId"), |
| |
| // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions |
| @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"), |
| |
| // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions |
| @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"), |
| |
| // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions |
| @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"), |
| |
| // Query to retrieve count of Coordinator actions which are pending |
| @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), |
| |
| // Query to retrieve status of Coordinator actions which are not pending |
| @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_BY_PENDING_FALSE", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0"), |
| |
| @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"), |
| |
| @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"), |
| |
| @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), |
| |
| @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), |
| |
| @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), |
| |
| @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")}) |
| |
| @NamedNativeQueries({ |
| |
| @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt") |
| }) |
| public class CoordinatorActionBean extends JsonCoordinatorAction implements |
| Writable { |
| @Basic |
| @Index |
| @Column(name = "job_id") |
| private String jobId; |
| |
| @Basic |
| @Index |
| @Column(name = "status") |
| private String status = null; |
| |
| @Basic |
| @Column(name = "nominal_time") |
| private java.sql.Timestamp nominalTimestamp = null; |
| |
| @Basic |
| @Index |
| @Column(name = "last_modified_time") |
| private java.sql.Timestamp lastModifiedTimestamp = null; |
| |
| @Basic |
| @Index |
| @Column(name = "created_time") |
| private java.sql.Timestamp createdTimestamp = null; |
| |
| @Basic |
| @Index |
| @Column(name = "rerun_time") |
| private java.sql.Timestamp rerunTimestamp = null; |
| |
| @Basic |
| @Index |
| @Column(name = "external_id") |
| private String externalId; |
| |
| @Column(name = "sla_xml") |
| @Lob |
| private String slaXml = null; |
| |
| @Basic |
| @Column(name = "pending") |
| private int pending = 0; |
| |
| public CoordinatorActionBean() { |
| } |
| |
| /** |
| * Serialize the coordinator bean to a data output. |
| * |
| * @param dataOutput data output. |
| * @throws IOException thrown if the coordinator bean could not be |
| * serialized. |
| */ |
| @Override |
| public void write(DataOutput dataOutput) throws IOException { |
| WritableUtils.writeStr(dataOutput, getJobId()); |
| WritableUtils.writeStr(dataOutput, getType()); |
| WritableUtils.writeStr(dataOutput, getId()); |
| WritableUtils.writeStr(dataOutput, getCreatedConf()); |
| WritableUtils.writeStr(dataOutput, getStatus().toString()); |
| dataOutput.writeInt(getActionNumber()); |
| WritableUtils.writeStr(dataOutput, getRunConf()); |
| WritableUtils.writeStr(dataOutput, getExternalStatus()); |
| WritableUtils.writeStr(dataOutput, getTrackerUri()); |
| WritableUtils.writeStr(dataOutput, getConsoleUrl()); |
| WritableUtils.writeStr(dataOutput, getErrorCode()); |
| WritableUtils.writeStr(dataOutput, getErrorMessage()); |
| dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); |
| dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); |
| } |
| |
| /** |
| * Deserialize a coordinator bean from a data input. |
| * |
| * @param dataInput data input. |
| * @throws IOException thrown if the workflow bean could not be |
| * deserialized. |
| */ |
| @Override |
| public void readFields(DataInput dataInput) throws IOException { |
| setJobId(WritableUtils.readStr(dataInput)); |
| setType(WritableUtils.readStr(dataInput)); |
| setId(WritableUtils.readStr(dataInput)); |
| setCreatedConf(WritableUtils.readStr(dataInput)); |
| setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); |
| setActionNumber(dataInput.readInt()); |
| setRunConf(WritableUtils.readStr(dataInput)); |
| setExternalStatus(WritableUtils.readStr(dataInput)); |
| setTrackerUri(WritableUtils.readStr(dataInput)); |
| setConsoleUrl(WritableUtils.readStr(dataInput)); |
| setErrorCode(WritableUtils.readStr(dataInput)); |
| setErrorMessage(WritableUtils.readStr(dataInput)); |
| long d = dataInput.readLong(); |
| if (d != -1) { |
| setCreatedTime(new Date(d)); |
| } |
| d = dataInput.readLong(); |
| if (d != -1) { |
| setLastModifiedTime(new Date(d)); |
| } |
| } |
| |
| @Override |
| public String getJobId() { |
| return this.jobId; |
| } |
| |
| @Override |
| public void setJobId(String id) { |
| super.setJobId(id); |
| this.jobId = id; |
| } |
| |
| @Override |
| public Status getStatus() { |
| return Status.valueOf(status); |
| } |
| |
| @Override |
| public void setStatus(Status status) { |
| super.setStatus(status); |
| this.status = status.toString(); |
| } |
| |
| @Override |
| public void setCreatedTime(Date createdTime) { |
| this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); |
| super.setCreatedTime(createdTime); |
| } |
| |
| public void setRerunTime(Date rerunTime) { |
| this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); |
| } |
| |
| @Override |
| public void setNominalTime(Date nominalTime) { |
| this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); |
| super.setNominalTime(nominalTime); |
| } |
| |
| @Override |
| public void setLastModifiedTime(Date lastModifiedTime) { |
| this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); |
| super.setLastModifiedTime(lastModifiedTime); |
| } |
| |
| @Override |
| public Date getCreatedTime() { |
| return DateUtils.toDate(createdTimestamp); |
| } |
| |
| public Timestamp getCreatedTimestamp() { |
| return createdTimestamp; |
| } |
| |
| public Date getRerunTime() { |
| return DateUtils.toDate(rerunTimestamp); |
| } |
| |
| public Timestamp getRerunTimestamp() { |
| return rerunTimestamp; |
| } |
| |
| @Override |
| public Date getLastModifiedTime() { |
| return DateUtils.toDate(lastModifiedTimestamp); |
| } |
| |
| public Timestamp getLastModifiedTimestamp() { |
| return lastModifiedTimestamp; |
| } |
| |
| @Override |
| public Date getNominalTime() { |
| return DateUtils.toDate(nominalTimestamp); |
| } |
| |
| public Timestamp getNominalTimestamp() { |
| return nominalTimestamp; |
| } |
| |
| @Override |
| public String getExternalId() { |
| return externalId; |
| } |
| |
| @Override |
| public void setExternalId(String externalId) { |
| super.setExternalId(externalId); |
| this.externalId = externalId; |
| } |
| |
| public String getSlaXml() { |
| return slaXml; |
| } |
| |
| public void setSlaXml(String slaXml) { |
| this.slaXml = slaXml; |
| } |
| |
| /** |
| * @return true if in terminal status |
| */ |
| public boolean isTerminalStatus() { |
| boolean isTerminal = true; |
| switch (getStatus()) { |
| case WAITING: |
| case READY: |
| case SUBMITTED: |
| case RUNNING: |
| case SUSPENDED: |
| isTerminal = false; |
| break; |
| default: |
| isTerminal = true; |
| break; |
| } |
| return isTerminal; |
| } |
| |
| /** |
| * Set some actions are in progress for particular coordinator action. |
| * |
| * @param pending set pending to true |
| */ |
| public void setPending(int pending) { |
| this.pending = pending; |
| } |
| |
| /** |
| * increment pending and return it |
| * |
| * @return pending |
| */ |
| public int incrementAndGetPending() { |
| this.pending++; |
| return pending; |
| } |
| |
| /** |
| * decrement pending and return it |
| * |
| * @return pending |
| */ |
| public int decrementAndGetPending() { |
| this.pending = Math.max(this.pending - 1, 0); |
| return pending; |
| } |
| |
| /** |
| * Get some actions are in progress for particular bundle action. |
| * |
| * @return pending |
| */ |
| public int getPending() { |
| return this.pending; |
| } |
| |
| /** |
| * Return if the action is pending. |
| * |
| * @return if the action is pending. |
| */ |
| public boolean isPending() { |
| return pending > 0 ? true : false; |
| } |
| } |