| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.command.coord; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.oozie.AppType; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.SLAEventBean; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.SLAEvent.SlaAppType; |
| import org.apache.oozie.client.rest.JsonBean; |
| import org.apache.oozie.command.CommandException; |
| import org.apache.oozie.command.MaterializeTransitionXCommand; |
| import org.apache.oozie.command.PreconditionException; |
| import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; |
| import org.apache.oozie.coord.CoordUtils; |
| import org.apache.oozie.coord.TimeUnit; |
| import org.apache.oozie.executor.jpa.BatchQueryExecutor; |
| import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; |
| import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.service.ConfigurationService; |
| import org.apache.oozie.service.CoordMaterializeTriggerService; |
| import org.apache.oozie.service.EventHandlerService; |
| import org.apache.oozie.service.JPAService; |
| import org.apache.oozie.service.Service; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.sla.SLAOperations; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.Instrumentation; |
| import org.apache.oozie.util.LogUtils; |
| import org.apache.oozie.util.ParamChecker; |
| import org.apache.oozie.util.StatusUtils; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XmlUtils; |
| import org.apache.oozie.util.db.SLADbOperations; |
| import org.jdom.Element; |
| import org.jdom.JDOMException; |
| |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.sql.Timestamp; |
| import java.util.Calendar; |
| import java.util.Date; |
| import java.util.TimeZone; |
| |
| /** |
| * Materialize actions for specified start and end time for coordinator job. |
| */ |
| @SuppressWarnings("deprecation") |
| public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { |
| |
| private JPAService jpaService = null; |
| private CoordinatorJobBean coordJob = null; |
| private String jobId = null; |
| private Date startMatdTime = null; |
| private Date endMatdTime = null; |
| private final int materializationWindow; |
| private int lastActionNumber = 1; // over-ride by DB value |
| private CoordinatorJob.Status prevStatus = null; |
| |
| static final private int lookAheadWindow = ConfigurationService.getInt(CoordMaterializeTriggerService |
| .CONF_LOOKUP_INTERVAL); |
| |
| /** |
| * Default MAX timeout in minutes, after which coordinator input check will timeout |
| */ |
| public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; |
| |
| /** |
| * The constructor for class {@link CoordMaterializeTransitionXCommand} |
| * |
| * @param jobId coordinator job id |
| * @param materializationWindow materialization window to calculate end time |
| */ |
| public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { |
| super("coord_mater", "coord_mater", 1); |
| this.jobId = ParamChecker.notEmpty(jobId, "jobId"); |
| this.materializationWindow = materializationWindow; |
| } |
| |
| public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime, |
| Date endTime) { |
| super("coord_mater", "coord_mater", 1); |
| this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId"); |
| this.materializationWindow = materializationWindow; |
| this.coordJob = coordJob; |
| this.startMatdTime = startTime; |
| this.endMatdTime = endTime; |
| } |
| |
| @Override |
| protected void setLogInfo() { |
| LogUtils.setLogInfo(jobId); |
| } |
| |
| @Override |
| public void transitToNext() throws CommandException { |
| } |
| |
| @Override |
| public void updateJob() throws CommandException { |
| updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob)); |
| } |
| |
| @Override |
| public void performWrites() throws CommandException { |
| try { |
| BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); |
| // register the partition related dependencies of actions |
| for (JsonBean actionBean : insertList) { |
| if (actionBean instanceof CoordinatorActionBean) { |
| CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean; |
| if (EventHandlerService.isEnabled()) { |
| CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); |
| } |
| |
| // TODO: time 100s should be configurable |
| queue(new CoordActionNotificationXCommand(coordAction), 100); |
| |
| //Delay for input check = (nominal time - now) |
| long checkDelay = coordAction.getNominalTime().getTime() - new Date().getTime(); |
| queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), |
| Math.max(checkDelay, 0)); |
| |
| if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) { |
| // TODO: Delay in catchup mode? |
| queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); |
| } |
| } |
| } |
| } |
| catch (JPAExecutorException jex) { |
| throw new CommandException(jex); |
| } |
| } |
| |
| @Override |
| public String getEntityKey() { |
| return this.jobId; |
| } |
| |
| @Override |
| protected boolean isLockRequired() { |
| return true; |
| } |
| |
| @Override |
| protected void loadState() throws CommandException { |
| jpaService = Services.get().get(JPAService.class); |
| if (jpaService == null) { |
| LOG.error(ErrorCode.E0610); |
| } |
| |
| try { |
| coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_MATERIALIZE, jobId); |
| prevStatus = coordJob.getStatus(); |
| } |
| catch (JPAExecutorException jex) { |
| throw new CommandException(jex); |
| } |
| LogUtils.setLogInfo(coordJob); |
| // calculate start materialize and end materialize time |
| calcMatdTime(); |
| } |
| |
| /** |
| * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null |
| * |
| * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime |
| */ |
| protected void calcMatdTime() throws CommandException { |
| Timestamp startTime = coordJob.getNextMaterializedTimestamp(); |
| if (startTime == null) { |
| startTime = coordJob.getStartTimestamp(); |
| } |
| // calculate end time by adding materializationWindow to start time. |
| // need to convert materializationWindow from secs to milliseconds |
| long startTimeMilli = startTime.getTime(); |
| long endTimeMilli = startTimeMilli + (materializationWindow * 1000); |
| |
| startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); |
| endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); |
| endMatdTime = getMaterializationTimeForCatchUp(endMatdTime); |
| // if MaterializationWindow end time is greater than endTime |
| // for job, then set it to endTime of job |
| Date jobEndTime = coordJob.getEndTime(); |
| if (endMatdTime.compareTo(jobEndTime) > 0) { |
| endMatdTime = jobEndTime; |
| } |
| |
| LOG.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.formatDateOozieTZ(startMatdTime) + ", end=" |
| + DateUtils.formatDateOozieTZ(endMatdTime) |
| + ", window=" + materializationWindow); |
| } |
| |
| /** |
| * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end |
| * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which |
| * case it returns now (to materialize all actions in the past) |
| * |
| * @param currentMatTime |
| * @return Date returns materialization for window for catch-up jobs |
| * @throws CommandException |
| * @throws JDOMException |
| */ |
| private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException { |
| if (currentMatTime.after(new Date())) { |
| return currentMatTime; |
| } |
| if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) || |
| coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) { |
| return new Date(); |
| } |
| final int frequency; |
| try { |
| frequency = Integer.parseInt(coordJob.getFrequency()); |
| } |
| catch (final NumberFormatException e) { |
| // Cron based frequency: catching up at maximum till the coordinator job's end time, |
| // bounded also by the throttle parameter, aka the number of coordinator actions to materialize |
| return coordJob.getEndTime(); |
| } |
| |
| TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); |
| TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); |
| Calendar startInstance = Calendar.getInstance(appTz); |
| startInstance.setTime(startMatdTime); |
| Calendar endMatInstance = null; |
| Calendar previousInstance = startInstance; |
| for (int i = 1; i <= coordJob.getMatThrottling(); i++) { |
| endMatInstance = (Calendar) startInstance.clone(); |
| endMatInstance.add(freqTU.getCalendarUnit(), i * frequency); |
| if (endMatInstance.getTime().compareTo(new Date()) >= 0) { |
| if (previousInstance.getTime().after(currentMatTime)) { |
| return previousInstance.getTime(); |
| } |
| else { |
| return currentMatTime; |
| } |
| } |
| previousInstance = endMatInstance; |
| } |
| if (endMatInstance == null) { |
| return currentMatTime; |
| } |
| else { |
| return endMatInstance.getTime(); |
| } |
| } |
| |
| @Override |
| protected void verifyPrecondition() throws CommandException, PreconditionException { |
| if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING |
| || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) { |
| throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId |
| + " job is not in PREP or RUNNING but in " + coordJob.getStatus()); |
| } |
| |
| if (coordJob.isDoneMaterialization()) { |
| throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId |
| + " job is already materialized"); |
| } |
| |
| if (coordJob.getNextMaterializedTimestamp() != null |
| && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) { |
| throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId |
| + " job is already materialized"); |
| } |
| |
| Timestamp startTime = coordJob.getNextMaterializedTimestamp(); |
| if (startTime == null) { |
| startTime = coordJob.getStartTimestamp(); |
| |
| if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) { |
| throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" |
| + jobId + " job's start time is not reached yet - nothing to materialize"); |
| } |
| } |
| |
| if (coordJob.getNextMaterializedTimestamp() != null |
| && coordJob.getNextMaterializedTimestamp().after( |
| new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) { |
| throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId |
| + " Request is for future time. Lookup time is " |
| + new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000) + " mat time is " |
| + coordJob.getNextMaterializedTimestamp()); |
| } |
| |
| if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) { |
| throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId |
| + ", all actions have been materialized from start time = " + coordJob.getStartTime() |
| + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr()); |
| } |
| |
| if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) { |
| throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId |
| + ", action is *already* materialized for Materialization start time = " + startMatdTime |
| + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr()); |
| } |
| |
| if (endMatdTime.after(coordJob.getEndTime())) { |
| throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId |
| + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = " |
| + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr()); |
| } |
| |
| if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) { |
| throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId |
| + ", materialization start time = " + startMatdTime |
| + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime() |
| + ", job status = " + coordJob.getStatusStr()); |
| } |
| |
| } |
| |
| @Override |
| protected void materialize() throws CommandException { |
| Instrumentation.Cron cron = new Instrumentation.Cron(); |
| cron.start(); |
| try { |
| materializeActions(false); |
| updateJobMaterializeInfo(coordJob); |
| } |
| catch (CommandException ex) { |
| LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex); |
| coordJob.setStatus(Job.Status.FAILED); |
| coordJob.resetPending(); |
| // remove any materialized actions and slaEvents |
| insertList.clear(); |
| } |
| catch (Exception e) { |
| LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e); |
| coordJob.setStatus(Job.Status.FAILED); |
| try { |
| CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob); |
| } |
| catch (JPAExecutorException jex) { |
| throw new CommandException(ErrorCode.E1011, jex); |
| } |
| throw new CommandException(ErrorCode.E1012, e.getMessage(), e); |
| } finally { |
| cron.stop(); |
| instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".materialize", cron); |
| } |
| } |
| |
| /** |
| * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table. |
| * |
| * @param dryrun if this is a dry run |
| * @return materialized action(s) |
| * @throws Exception thrown if failed to materialize actions |
| */ |
| protected String materializeActions(boolean dryrun) throws Exception { |
| |
| Configuration jobConf = null; |
| try { |
| jobConf = new XConfiguration(new StringReader(coordJob.getConf())); |
| } |
| catch (IOException ioe) { |
| LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); |
| throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); |
| } |
| |
| String jobXml = coordJob.getJobXml(); |
| Element eJob = XmlUtils.parseXml(jobXml); |
| TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); |
| |
| String frequency = coordJob.getFrequency(); |
| TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); |
| TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); |
| Calendar start = Calendar.getInstance(appTz); |
| start.setTime(startMatdTime); |
| DateUtils.moveToEnd(start, endOfFlag); |
| Calendar end = Calendar.getInstance(appTz); |
| end.setTime(endMatdTime); |
| lastActionNumber = coordJob.getLastActionNumber(); |
| //Intentionally printing dates in their own timezone, not Oozie timezone |
| LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end=" |
| + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":" |
| + freqTU + ",\n lastActionNumber " + lastActionNumber); |
| // Keep the actual start time |
| Calendar origStart = Calendar.getInstance(appTz); |
| origStart.setTime(coordJob.getStartTimestamp()); |
| // Move to the End of duration, if needed. |
| DateUtils.moveToEnd(origStart, endOfFlag); |
| |
| StringBuilder actionStrings = new StringBuilder(); |
| Date jobPauseTime = coordJob.getPauseTime(); |
| Calendar pause = null; |
| if (jobPauseTime != null) { |
| pause = Calendar.getInstance(appTz); |
| pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); |
| } |
| |
| String action = null; |
| int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); |
| int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; |
| // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated |
| boolean ignoreMaxActions = |
| (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) || |
| coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) |
| && endMatdTime.before(new Date()); |
| LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated |
| + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions); |
| |
| boolean isCronFrequency = false; |
| |
| Calendar effStart = (Calendar) start.clone(); |
| try { |
| int intFrequency = Integer.parseInt(coordJob.getFrequency()); |
| effStart = (Calendar) origStart.clone(); |
| effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency); |
| } |
| catch (NumberFormatException e) { |
| isCronFrequency = true; |
| } |
| |
| boolean firstMater = true; |
| |
| while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) { |
| if (pause != null && effStart.compareTo(pause) >= 0) { |
| break; |
| } |
| |
| Date nextTime = effStart.getTime(); |
| |
| if (isCronFrequency) { |
| if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) { |
| effStart.add(Calendar.MINUTE, -1); |
| firstMater = false; |
| } |
| nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob); |
| Date prevTime = new Date(effStart.getTimeInMillis()); |
| effStart.setTime(nextTime); |
| addDSTChangeToNominalTime(prevTime, nextTime, coordJob); |
| } |
| |
| if (effStart.compareTo(end) < 0) { |
| |
| if (pause != null && effStart.compareTo(pause) >= 0) { |
| break; |
| } |
| CoordinatorActionBean actionBean = new CoordinatorActionBean(); |
| lastActionNumber++; |
| |
| int timeout = coordJob.getTimeout(); |
| LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime()) |
| + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes"); |
| Date actualTime = new Date(); |
| action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), |
| nextTime, actualTime, lastActionNumber, jobConf, actionBean); |
| actionBean.setTimeOut(timeout); |
| if (!dryrun) { |
| storeToDB(actionBean, action, jobConf); // Storing to table |
| |
| } |
| else { |
| actionStrings.append("action for new instance"); |
| actionStrings.append(action); |
| } |
| } |
| else { |
| break; |
| } |
| |
| if (!isCronFrequency) { |
| effStart = (Calendar) origStart.clone(); |
| effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency())); |
| } |
| } |
| |
| if (isCronFrequency) { |
| if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) { |
| //Since we exceed the throttle, we need to move the nextMadtime forward |
| //to avoid creating duplicate actions |
| if (!firstMater) { |
| effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob)); |
| } |
| } |
| } |
| |
| endMatdTime = effStart.getTime(); |
| |
| if (!dryrun) { |
| return action; |
| } |
| else { |
| return actionStrings.toString(); |
| } |
| } |
| |
| /** |
| * Apply DST correction according the job`s timezone, if the difference between the previous nominal time and the actual one |
| * is greater or equal than 24 hours. |
| * Calendar uses a similar approach: applies DST change if the TimeUnit is lower or equal than TimeUnit.DAY. With this approach |
| * a similar behaviour can be achieved. |
| * |
| * @see {@http://oozie.apache.org/docs/5.0.0/CoordinatorFunctionalSpec.html#a7._Handling_Timezones_and_Daylight_Saving_Time} |
| * |
| * @param prevTime nominal time of the previous coordinator action |
| * @param nextTime nominal time of the actual coordinator action |
| * @param coordJob the coordinator job |
| */ |
| private void addDSTChangeToNominalTime(Date prevTime, Date nextTime, CoordinatorJobBean coordJob) { |
| final long differenceBetweenTwoActionsInSec = java.util.concurrent.TimeUnit.MILLISECONDS.toSeconds |
| (Math.abs(prevTime.getTime() - nextTime.getTime())); |
| final long oneDayInSeconds = java.util.concurrent.TimeUnit.DAYS.toSeconds(1); |
| if (differenceBetweenTwoActionsInSec < oneDayInSeconds) { |
| return; |
| } |
| final long dstOffset = DaylightOffsetCalculator.getDSTOffset(DateUtils |
| .getTimeZone(coordJob.getTimeZone()), coordJob.getStartTime(), nextTime); |
| LOG.debug("[{0}] ms DST offset applied to nominal time for coordinator job: [{1}]", dstOffset, coordJob.getId()); |
| nextTime.setTime(nextTime.getTime() + dstOffset); |
| } |
| |
| private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception { |
| LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " |
| + actionXml.length()); |
| actionBean.setActionXml(actionXml); |
| insertList.add(actionBean); |
| writeActionSlaRegistration(actionXml, actionBean, jobConf); |
| } |
| |
| private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf) |
| throws Exception { |
| Element eAction = XmlUtils.parseXml(actionXml); |
| Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); |
| SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), |
| SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG); |
| if (slaEvent != null) { |
| insertList.add(slaEvent); |
| } |
| // inserting into new table also |
| SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), |
| AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false, |
| CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf)); |
| } |
| |
| private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { |
| job.setLastActionTime(endMatdTime); |
| job.setLastActionNumber(lastActionNumber); |
| // if the job endtime == action endtime, we don't need to materialize this job anymore |
| Date jobEndTime = job.getEndTime(); |
| |
| if (job.getStatus() == CoordinatorJob.Status.PREP){ |
| LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING"); |
| job.setStatus(Job.Status.RUNNING); |
| } |
| job.setPending(); |
| Calendar end = Calendar.getInstance(); |
| end.setTime(jobEndTime); |
| |
| if (end.getTime().compareTo(endMatdTime) <= 0) { |
| LOG.info("[" + job.getId() + "]: all actions have been materialized, set pending to true"); |
| // set doneMaterialization to true when materialization is done |
| job.setDoneMaterialization(); |
| } |
| job.setStatus(StatusUtils.getStatus(job)); |
| LOG.info("Coord Job status updated to = " + job.getStatus()); |
| job.setNextMaterializedTime(endMatdTime); |
| } |
| |
| @Override |
| public String getKey() { |
| return getName() + "_" + jobId; |
| } |
| |
| @Override |
| public void notifyParent() throws CommandException { |
| // update bundle action only when status changes in coord job |
| if (this.coordJob.getBundleId() != null) { |
| if (!prevStatus.equals(coordJob.getStatus())) { |
| BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); |
| bundleStatusUpdate.call(); |
| } |
| } |
| } |
| } |