blob: b19dd50766e813da1183b6bdcb00aaa14bafe294 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.ElException;
import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
/**
* The command to check if an action's data input paths exist in the file system.
*/
public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance";
private final String actionId;
/**
* Property name of command re-queue interval for coordinator action input check in
* milliseconds.
*/
public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
+ "coord.input.check.requeue.interval";
public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX
+ "coord.input.check.requeue.interval.additional.delay";
private CoordinatorActionBean coordAction = null;
private CoordinatorJobBean coordJob = null;
private JPAService jpaService = null;
private String jobId = null;
public CoordActionInputCheckXCommand(String actionId, String jobId) {
super("coord_action_input", "coord_action_input", 1);
this.actionId = ParamChecker.notEmpty(actionId, "actionId");
this.jobId = jobId;
}
@Override
protected void setLogInfo() {
LogUtils.setLogInfo(actionId);
}
@Override
protected Void execute() throws CommandException {
LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
// this action should only get processed if current time > nominal time;
// otherwise, requeue this action for delay execution;
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
if (nominalTime.compareTo(currentTime) > 0) {
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime()
- currentTime.getTime());
updateCoordAction(coordAction, false);
LOG.info("[" + actionId
+ "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
+ DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
return null;
}
StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
boolean isChangeInDependency = false;
try {
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
Date now = new Date();
if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction);
if (nextNominalTime != null) {
// If the current time is after the next action's nominal time, then we've passed the window where this action
// should be started; so set it to SKIPPED
if (now.after(nextNominalTime)) {
LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
+ "the nominal time [{2}] of the next action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
return null;
} else {
LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
+ "the nominal time [{2}] of the next action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
}
}
}
else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
// If the current time is after the nominal time of this action plus some tolerance,
// then we've passed the window where this action should be started; so set it to SKIPPED
Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
cal.setTime(nominalTime);
int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE);
cal.add(Calendar.MINUTE, tolerance);
if (now.after(cal.getTime())) {
LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
+ " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime));
queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
return null;
} else {
LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
+ " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
}
}
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
String missingDependencies = coordPullInputDependency.getMissingDependencies();
StringBuilder nonResolvedList = new StringBuilder();
CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
String firstMissingDependency = "";
// For clarity regarding which is the missing dependency in synchronous order
// instead of printing entire list, some of which, may be available
if (nonExistList.length() > 0) {
firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
}
LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
+ nonResolvedList.toString());
boolean status = false;
try {
status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
}
catch (Exception e){
if(existList.length() > 0){
isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
}
throw e;
}
boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
if (status && nonResolvedList.length() > 0) {
status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
}
coordAction.setLastModifiedTime(currentTime);
coordAction.setActionXml(actionXml.toString());
isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
if (status && isPushDependenciesMet) {
moveCoordActionToReady(actionXml, actionConf, coordPullInputDependency, coordPushInputDependency);
}
else if (!isTimeout(currentTime)) {
if (!status) {
long addtionalDelay = isChangeInDependency ? 0
: ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY)
* 1000L;
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
addtionalDelay + getCoordInputCheckRequeueInterval());
}
updateCoordAction(coordAction, isChangeInDependency);
}
else {
if (isPushDependenciesMet) {
queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
// Let CoordPushDependencyCheckXCommand queue the timeout
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
}
updateCoordAction(coordAction, isChangeInDependency);
}
}
catch (AccessControlException e) {
LOG.error("Permission error in ActionInputCheck", e);
if (isTimeout(currentTime)) {
LOG.debug("Queueing timeout command");
Services.get().get(CallableQueueService.class)
.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
// Requeue InputCheckCommand for permission denied error with longer interval
Services.get()
.get(CallableQueueService.class)
.queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
2 * getCoordInputCheckRequeueInterval());
}
updateCoordAction(coordAction, isChangeInDependency);
}
catch (Exception e) {
if (isTimeout(currentTime)) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
Services.get().get(CallableQueueService.class)
.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
updateCoordAction(coordAction, isChangeInDependency);
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
return null;
}
private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
StringBuilder nonResolvedList, boolean status) throws IOException {
if (nonResolvedList.length() > 0 && status == false) {
nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
}
return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
nonResolvedList, status);
}
static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
throws Exception {
return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
}
static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
pushDependencies);
materializeDataProperties(eAction, actionConf, eval);
return XmlUtils.prettyPrint(eAction).toString();
}
private boolean isTimeout(Date currentTime) {
long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
.getCreatedTime().getTime()))
/ (60 * 1000);
int timeOut = coordAction.getTimeOut();
return (timeOut >= 0) && (waitingTime > timeOut);
}
private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
throws CommandException {
coordAction.setLastModifiedTime(new Date());
if (jpaService != null) {
try {
if (isChangeInDependency) {
coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
// since event is not to be generated unless action
// RUNNING via StartX
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
}
else {
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
}
}
catch (Exception jex) {
throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
}
}
}
/**
* This function reads the value of re-queue interval for coordinator input
* check command from the Oozie configuration provided by Configuration
* Service. If nothing defined in the configuration, it uses the code
* specified default value.
*
* @return re-queue interval in ms
*/
public long getCoordInputCheckRequeueInterval() {
long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL);
return requeueInterval;
}
/**
* To check the list of input paths if all of them exist
*
* @param actionXml action xml
* @param existList the list of existed paths
* @param nonExistList the list of non existed paths
* @param conf action configuration
* @return true if all input paths are existed
* @throws Exception thrown of unable to check input path
*/
protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
Configuration conf) throws Exception {
return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
nonExistList);
}
/**
* Check un resolved input.
*
* @param coordAction the coord action
* @param actionXml the action xml
* @param conf the conf
* @return true, if successful
* @throws Exception the exception
*/
protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
Configuration conf) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
if (allExist) {
actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
}
return allExist;
}
/**
* Check un resolved input.
*
* @param actionXml the action xml
* @param conf the conf
* @return true, if successful
* @throws Exception the exception
*/
protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
return checkUnResolvedInput(coordAction, actionXml, conf);
}
/**
* Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
* of files that will be needed.
*
* @param eAction action element
* @param conf action configuration
* @throws Exception thrown if failed to resolve data properties
* @update modify 'Action' element with appropriate list of files.
*/
@SuppressWarnings("unchecked")
static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
if (configElem != null) {
for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
resolveTagContents("value", propElem, eval);
}
}
}
/**
* To resolve property value which contains el functions
*
* @param tagName tag name
* @param elem the child element of "property" element
* @param eval el functions evaluator
* @throws Exception thrown if unable to resolve tag value
*/
private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
if (elem == null) {
return;
}
Element tagElem = elem.getChild(tagName, elem.getNamespace());
if (tagElem != null) {
String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
tagElem.removeContent();
tagElem.addContent(updated);
}
else {
XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
}
}
/**
* Check if any unsolved paths under data output. Resolve the unresolved data input paths.
*
* @param eAction action element
* @param actionConf action configuration
* @return true if successful to resolve input and output paths
* @throws Exception thrown if failed to resolve data input and output paths
*/
@SuppressWarnings("unchecked")
private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
Configuration actionConf) throws Exception {
boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
// Using latest() or future() in output-event is not intuitive.
// We need to make sure, this assumption is correct.
Element outputList = eAction.getChild("output-events", eAction.getNamespace());
if (outputList != null) {
for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
" not permitted in output-event ");
}
}
}
return ret;
}
/**
* Resolves coordinator configuration and moves CoordAction to READY state
*
* @param actionXml
* @param actionConf
* @param coordPullInputDependency
* @param coordPushInputDependency
* @throws Exception
*/
private void moveCoordActionToReady(StringBuilder actionXml, Configuration actionConf,
CoordInputDependency coordPullInputDependency, CoordInputDependency coordPushInputDependency)
throws Exception {
String newActionXml = null;
try {
newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId, coordPullInputDependency,
coordPushInputDependency);
}
catch (ElException e) {
coordAction.setStatus(CoordinatorAction.Status.FAILED);
updateCoordAction(coordAction, true);
throw e;
}
actionXml.replace(0, actionXml.length(), newActionXml);
coordAction.setActionXml(actionXml.toString());
coordAction.setStatus(CoordinatorAction.Status.READY);
updateCoordAction(coordAction, true);
new CoordActionReadyXCommand(coordAction.getJobId()).call();
}
/**
* getting the error code of the coord action. (used mainly for unit testing)
*
* @return error code
*/
protected String getCoordActionErrorCode() {
if (coordAction != null) {
return coordAction.getErrorCode();
}
return null;
}
/**
* getting the error message of the coord action. (used mainly for unit testing)
*
* @return error message
*/
protected String getCoordActionErrorMsg() {
if (coordAction != null) {
return coordAction.getErrorMessage();
}
return null;
}
@Override
public String getEntityKey() {
return this.jobId;
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
protected void loadState() throws CommandException {
if (jpaService == null) {
jpaService = Services.get().get(JPAService.class);
}
try {
coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
if (coordAction != null){
coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK,
coordAction.getJobId());
}
else {
throw new CommandException(ErrorCode.E0605, actionId);
}
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
LogUtils.setLogInfo(coordAction);
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
+ coordAction.getStatus());
}
// if eligible to do action input check when running with backward support is true
if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
return;
}
if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
&& coordJob.getStatus() != Job.Status.PAUSED
&& coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
throw new PreconditionException(
ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
" Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+ coordJob.getStatus());
}
}
@Override
public String getKey(){
return getName() + "_" + actionId;
}
}