blob: 835777c8ddf16d6bafc616bdc06d846025efcff5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.bundle;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.StatusTransitXCommand;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
/**
* BundleStatusTransitXCommand update job's status according to its child actions' status. If all child actions' pending
* flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's
* status to SUCCEEDED.
*/
public class BundleStatusTransitXCommand extends StatusTransitXCommand {
private String jobId;
private List<BundleActionBean> bundleActions;
private BundleJobBean bundleJob;
private boolean foundPending;
private HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
public BundleStatusTransitXCommand(String id) {
super("bundle_status_transit", "bundle_status_transit", 0);
this.jobId = id;
}
@Override
public String getEntityKey() {
return jobId;
}
@Override
protected void loadState() throws CommandException {
try {
bundleJob = BundleJobQueryExecutor.getInstance().get(
BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
bundleActions = BundleActionQueryExecutor.getInstance().getList(
BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
for (BundleActionBean bAction : bundleActions) {
int counter = 0;
if (bundleActionStatus.containsKey(bAction.getStatus())) {
counter = getActionStatusCount(bAction.getStatus()) + 1;
}
else {
++counter;
}
bundleActionStatus.put(bAction.getStatus(), counter);
if (bAction.getCoordId() == null
&& (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) {
new BundleKillXCommand(jobId).call();
bundleJob = BundleJobQueryExecutor.getInstance().get(
BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
bundleJob.setStatus(Job.Status.FAILED);
bundleJob.setLastModifiedTime(new Date());
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS,
bundleJob);
}
if (bAction.isPending()) {
LOG.debug(bAction + " has pending flag set");
foundPending = true;
}
}
LogUtils.setLogInfo(bundleJob);
}
catch (JPAExecutorException e) {
throw new CommandException(ErrorCode.E1322, e);
}
}
@Override
protected Job.Status getJobStatus() throws CommandException {
Job.Status jobStatus = super.getJobStatus();
if (jobStatus == null) {
if (isPrepRunningState()) {
return getPrepRunningStatus();
}
}
return jobStatus;
}
@Override
protected boolean isTerminalState() {
return !foundPending
&& bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)
+ getActionStatusCount(Job.Status.FAILED) + getActionStatusCount(Job.Status.KILLED)
+ getActionStatusCount(Job.Status.DONEWITHERROR);
}
@Override
protected Job.Status getTerminalStatus() {
// If all bundle action is done and bundle is killed, then don't change the status.
if (bundleJob.getStatus().equals(Job.Status.KILLED)) {
return Job.Status.KILLED;
}
// If all the bundle actions are succeeded then bundle job should be succeeded.
if (bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)) {
return Job.Status.SUCCEEDED;
}
else if (bundleActions.size() == getActionStatusCount(Job.Status.KILLED)) {
// If all the bundle actions are KILLED then bundle job should be KILLED.
return Job.Status.KILLED;
}
else if (bundleActions.size() == getActionStatusCount(Job.Status.FAILED)) {
// If all the bundle actions are FAILED then bundle job should be FAILED.
return Job.Status.FAILED;
}
else {
return Job.Status.DONEWITHERROR;
}
}
@Override
protected boolean isPausedState() {
//If bundle is paused then timestamp will be set.
//If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up.
if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR
&& bundleJob.getPauseTime() != null) {
return true;
}
else {
return getBottomUpPauseStatus() != null;
}
}
@Override
protected Job.Status getPausedState() {
if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
|| bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
|| bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
return Job.Status.PAUSEDWITHERROR;
}
else {
return Job.Status.PAUSED;
}
}
return getBottomUpPauseStatus();
}
@Override
protected boolean isSuspendedState() {
//If bundle is suspended then timestamp will be set.
//If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up.
if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR)
&& bundleJob.getSuspendedTimestamp() != null) {
return true;
}
return getBottomUpSuspendedState() != null;
}
@Override
protected Job.Status getSuspendedStatus() {
if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
|| bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
return Job.Status.SUSPENDEDWITHERROR;
}
else {
return Job.Status.SUSPENDED;
}
}
return getBottomUpSuspendedState();
}
@Override
protected boolean isRunningState() {
return true;
}
@Override
protected Status getRunningState() {
if (bundleJob.getStatus() != Job.Status.PREP) {
return getRunningStatus(bundleActionStatus);
}
else
return null;
}
@Override
protected void updateJobStatus(Job.Status bundleStatus) throws JPAExecutorException {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus + "' from '" + bundleJob.getStatus() + "'");
String jobId = bundleJob.getId();
// Update the Bundle Job
// Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
// PAUSEDWITHERROR is not supported
bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
bundleJob.setLastModifiedTime(new Date());
if (foundPending) {
bundleJob.setPending();
LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
}
else {
bundleJob.resetPending();
LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
}
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
bundleJob);
}
/**
* bottom up; check the status of parent through their children.
*
* @return the bottom up pause status
*/
private Job.Status getBottomUpPauseStatus() {
if (bundleActionStatus.containsKey(Job.Status.PAUSED)
&& bundleActions.size() == getActionStatusCount(Job.Status.PAUSED)) {
return Job.Status.PAUSED;
}
else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
&& bundleActions.size() == getActionStatusCount(Job.Status.PAUSED)
+ getActionStatusCount(Job.Status.PAUSEDWITHERROR)) {
return Job.Status.PAUSEDWITHERROR;
}
return null;
}
/**
* Bottom up update status of parent from the status of its children.
*
* @return the bottom up suspended state
*/
private Job.Status getBottomUpSuspendedState() {
if (!foundPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
|| bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDED)
+ getActionStatusCount(Job.Status.SUCCEEDED)) {
return Job.Status.SUSPENDED;
}
else if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDEDWITHERROR)
+ getActionStatusCount(Job.Status.SUSPENDED) + getActionStatusCount(Job.Status.SUCCEEDED)
+ getActionStatusCount(Job.Status.KILLED) + getActionStatusCount(Job.Status.FAILED)
+ getActionStatusCount(Job.Status.DONEWITHERROR)) {
return Job.Status.SUSPENDEDWITHERROR;
}
}
return null;
}
private boolean hasTerminatedActions() {
return bundleActionStatus.containsKey(Job.Status.KILLED) || bundleActionStatus.containsKey(Job.Status.FAILED)
|| bundleActionStatus.containsKey(Job.Status.DONEWITHERROR);
}
private boolean isPrepRunningState() {
return !foundPending && bundleActionStatus.containsKey(Job.Status.PREP)
&& bundleActions.size() > getActionStatusCount(Job.Status.PREP);
}
private Status getPrepRunningStatus() {
return getRunningStatus(bundleActionStatus);
}
private int getActionStatusCount(final Job.Status status) {
if (bundleActionStatus.containsKey(status)) {
return bundleActionStatus.get(status);
}
else {
return 0;
}
}
private Job.Status getRunningStatus(HashMap<Job.Status, Integer> actionStatus) {
if (actionStatus.containsKey(Job.Status.FAILED) || actionStatus.containsKey(Job.Status.KILLED)
|| actionStatus.containsKey(Job.Status.DONEWITHERROR)
|| actionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
return Job.Status.RUNNINGWITHERROR;
}
else {
return Job.Status.RUNNING;
}
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
}
}