blob: 223c1528c8ab4429614eb35ce5ccfcfc16512488 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.ElException;
import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.RecoveryService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.DateUtils;
public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
protected String actionId;
protected JPAService jpaService = null;
protected CoordinatorActionBean coordAction = null;
protected CoordinatorJobBean coordJob = null;
/**
* Property name of command re-queue interval for coordinator push check in
* milliseconds.
*/
public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
+ "coord.push.check.requeue.interval";
private boolean registerForNotification;
private boolean removeAvailDependencies;
public CoordPushDependencyCheckXCommand(String actionId) {
this(actionId, false, true);
}
public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
this(actionId, registerForNotification, !registerForNotification);
}
public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
boolean removeAvailDependencies) {
super("coord_push_dep_check", "coord_push_dep_check", 0);
this.actionId = actionId;
this.registerForNotification = registerForNotification;
this.removeAvailDependencies = removeAvailDependencies;
}
protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
super(actionName, actionName, 0);
this.actionId = actionId;
}
@Override
protected void setLogInfo() {
LogUtils.setLogInfo(actionId);
}
@Override
protected Void execute() throws CommandException {
// this action should only get processed if current time > nominal time;
// otherwise, requeue this action for delay execution;
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
if (nominalTime.compareTo(currentTime) > 0) {
queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), nominalTime.getTime() - currentTime.getTime());
updateCoordAction(coordAction, false);
LOG.info("[" + actionId
+ "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current="
+ DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
return null;
}
CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList();
LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0));
LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray);
if (registerForNotification) {
LOG.debug("Register for notifications is true");
}
try {
Configuration actionConf = null;
try {
actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
}
catch (IOException e) {
throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
}
boolean isChangeInDependency = true;
boolean timeout = false;
ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction,
registerForNotification);
// Check all dependencies during materialization to avoid registering in the cache.
// But check only first missing one afterwards similar to
// CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) {
isChangeInDependency = false;
}
else {
String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDependency.getMissingDependencies());
coordPushInputDependency.setMissingDependencies(stillMissingDeps);
}
if (coordPushInputDependency.isDependencyMet()) {
// All push-based dependencies are available
onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
}
else {
// Checking for timeout
timeout = isTimeout();
if (timeout) {
queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
getCoordPushCheckRequeueInterval());
}
}
updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet());
if (registerForNotification) {
registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf);
}
if (removeAvailDependencies) {
unregisterAvailableDependencies(actionDependency.getAvailableDependencies());
}
if (timeout) {
unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId);
}
}
catch (Exception e) {
final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
if (isTimeout()) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(),
coordJob.getAppName()));
unregisterMissingDependencies(missingDependenciesArray, actionId);
}
else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) {
// Queue again on exception as RecoveryService will not queue this again with
// the action being updated regularly by CoordActionInputCheckXCommand
callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
registerForNotification, removeAvailDependencies),
Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
}
return null;
}
/**
* Return the re-queue interval for coord push dependency check
* @return requeueInterval returns the requeue interval for coord push dependency check
*/
public long getCoordPushCheckRequeueInterval() {
long requeueInterval = ConfigurationService.getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL);
return requeueInterval;
}
/**
* Returns true if timeout period has been reached
*
* @return true if it is time for timeout else false
*/
protected boolean isTimeout() {
long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
.getCreatedTime().getTime()))
/ (60 * 1000);
int timeOut = coordAction.getTimeOut();
return (timeOut >= 0) && (waitingTime > timeOut);
}
protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException {
Services.get().get(PartitionDependencyManagerService.class)
.removeCoordActionWithDependenciesAvailable(coordAction.getId());
if (isPullDependencyMeet) {
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
// The action should become READY only if current time > nominal time;
// CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
if (nominalTime.compareTo(currentTime) > 0) {
LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
+ DateUtils.formatDateOozieTZ(currentTime) + ", nominal="
+ DateUtils.formatDateOozieTZ(nominalTime));
}
else {
String actionXml = resolveCoordConfiguration();
coordAction.setActionXml(actionXml);
coordAction.setStatus(CoordinatorAction.Status.READY);
// pass jobID to the CoordActionReadyXCommand
queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
}
}
else if (isTimeout()) {
// If it is timeout and all push dependencies are available but still some unresolved
// missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
// wait till RecoveryService kicks in
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
}
coordAction.getPushInputDependencies().setDependencyMet(true);
}
private String resolveCoordConfiguration() throws CommandException {
try {
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
actionId, coordAction.getPullInputDependencies(), coordAction
.getPushInputDependencies());
actionXml.replace(0, actionXml.length(), newActionXml);
return actionXml.toString();
}
catch (ElException e) {
coordAction.setStatus(CoordinatorAction.Status.FAILED);
updateCoordAction(coordAction, true);
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
catch (Exception e) {
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
}
protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
throws CommandException {
coordAction.setLastModifiedTime(new Date());
if (jpaService != null) {
try {
if (isChangeInDependency) {
coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
// since event is not to be generated unless action
// RUNNING via StartX
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
}
else {
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
}
}
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
}
catch (IOException ioe) {
throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe);
}
}
}
private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
for (String missingDep : missingDeps) {
try {
URI missingURI = new URI(missingDep);
URIHandler handler = uriService.getURIHandler(missingURI);
handler.registerForNotification(missingURI, actionConf, user, actionId);
LOG.debug("Registered uri [{0}] for notifications", missingURI);
}
catch (Exception e) {
LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
}
}
}
private void unregisterAvailableDependencies(List<String> availableDeps) {
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
for (String availableDep : availableDeps) {
try {
URI availableURI = new URI(availableDep);
URIHandler handler = uriService.getURIHandler(availableURI);
if (handler.unregisterFromNotification(availableURI, actionId)) {
LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
}
else {
LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
}
}
catch (Exception e) {
LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
}
}
}
public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
for (String missingDep : missingDeps) {
try {
URI missingURI = new URI(missingDep);
URIHandler handler = uriService.getURIHandler(missingURI);
if (handler.unregisterFromNotification(missingURI, actionId)) {
LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
}
else {
LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
}
}
catch (Exception e) {
LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
}
}
}
@Override
public String getEntityKey() {
return actionId.substring(0, actionId.indexOf("@"));
}
@Override
public String getKey(){
return getName() + "_" + actionId;
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
protected void loadState() throws CommandException {
jpaService = Services.get().get(JPAService.class);
try {
coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
if (coordAction != null) {
coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
LogUtils.setLogInfo(coordAction);
}
else {
throw new CommandException(ErrorCode.E0605, actionId);
}
}
catch (JPAExecutorException je) {
final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
callableQueueService.queue(new CoordPushDependencyCheckXCommand(actionId), getCoordPushCheckRequeueInterval());
throw new CommandException(je);
}
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
+ coordAction.getStatus());
}
// if eligible to do action input check when running with backward
// support is true
if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
return;
}
if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
&& coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ "]::CoordPushDependencyCheck:: Ignoring action."
+ " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+ coordJob.getStatus());
}
}
}