blob: 4faa0da9399dd2f7f39abadaca0b716e96992350 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.MaterializeTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.CoordMaterializeTriggerService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLAOperations;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.db.SLADbOperations;
import org.jdom.Element;
import org.jdom.JDOMException;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
/**
* Materialize actions for specified start and end time for coordinator job.
*/
@SuppressWarnings("deprecation")
public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
private JPAService jpaService = null;
private CoordinatorJobBean coordJob = null;
private String jobId = null;
private Date startMatdTime = null;
private Date endMatdTime = null;
private final int materializationWindow;
private int lastActionNumber = 1; // over-ride by DB value
private CoordinatorJob.Status prevStatus = null;
static final private int lookAheadWindow = ConfigurationService.getInt(CoordMaterializeTriggerService
.CONF_LOOKUP_INTERVAL);
/**
* Default MAX timeout in minutes, after which coordinator input check will timeout
*/
public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
/**
* The constructor for class {@link CoordMaterializeTransitionXCommand}
*
* @param jobId coordinator job id
* @param materializationWindow materialization window to calculate end time
*/
public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
super("coord_mater", "coord_mater", 1);
this.jobId = ParamChecker.notEmpty(jobId, "jobId");
this.materializationWindow = materializationWindow;
}
public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime,
Date endTime) {
super("coord_mater", "coord_mater", 1);
this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId");
this.materializationWindow = materializationWindow;
this.coordJob = coordJob;
this.startMatdTime = startTime;
this.endMatdTime = endTime;
}
@Override
protected void setLogInfo() {
LogUtils.setLogInfo(jobId);
}
@Override
public void transitToNext() throws CommandException {
}
@Override
public void updateJob() throws CommandException {
updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
}
@Override
public void performWrites() throws CommandException {
try {
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
// register the partition related dependencies of actions
for (JsonBean actionBean : insertList) {
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
if (EventHandlerService.isEnabled()) {
CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
// TODO: time 100s should be configurable
queue(new CoordActionNotificationXCommand(coordAction), 100);
//Delay for input check = (nominal time - now)
long checkDelay = coordAction.getNominalTime().getTime() - new Date().getTime();
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
Math.max(checkDelay, 0));
if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) {
// TODO: Delay in catchup mode?
queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
}
}
}
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
}
}
@Override
public String getEntityKey() {
return this.jobId;
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
protected void loadState() throws CommandException {
jpaService = Services.get().get(JPAService.class);
if (jpaService == null) {
LOG.error(ErrorCode.E0610);
}
try {
coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_MATERIALIZE, jobId);
prevStatus = coordJob.getStatus();
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
}
LogUtils.setLogInfo(coordJob);
// calculate start materialize and end materialize time
calcMatdTime();
}
/**
* Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
*
* @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
*/
protected void calcMatdTime() throws CommandException {
Timestamp startTime = coordJob.getNextMaterializedTimestamp();
if (startTime == null) {
startTime = coordJob.getStartTimestamp();
}
// calculate end time by adding materializationWindow to start time.
// need to convert materializationWindow from secs to milliseconds
long startTimeMilli = startTime.getTime();
long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
endMatdTime = getMaterializationTimeForCatchUp(endMatdTime);
// if MaterializationWindow end time is greater than endTime
// for job, then set it to endTime of job
Date jobEndTime = coordJob.getEndTime();
if (endMatdTime.compareTo(jobEndTime) > 0) {
endMatdTime = jobEndTime;
}
LOG.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.formatDateOozieTZ(startMatdTime) + ", end="
+ DateUtils.formatDateOozieTZ(endMatdTime)
+ ", window=" + materializationWindow);
}
/**
* Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
* Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which
* case it returns now (to materialize all actions in the past)
*
* @param currentMatTime
* @return Date returns materialization for window for catch-up jobs
* @throws CommandException
* @throws JDOMException
*/
private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException {
if (currentMatTime.after(new Date())) {
return currentMatTime;
}
if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) {
return new Date();
}
final int frequency;
try {
frequency = Integer.parseInt(coordJob.getFrequency());
}
catch (final NumberFormatException e) {
// Cron based frequency: catching up at maximum till the coordinator job's end time,
// bounded also by the throttle parameter, aka the number of coordinator actions to materialize
return coordJob.getEndTime();
}
TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
Calendar startInstance = Calendar.getInstance(appTz);
startInstance.setTime(startMatdTime);
Calendar endMatInstance = null;
Calendar previousInstance = startInstance;
for (int i = 1; i <= coordJob.getMatThrottling(); i++) {
endMatInstance = (Calendar) startInstance.clone();
endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
if (previousInstance.getTime().after(currentMatTime)) {
return previousInstance.getTime();
}
else {
return currentMatTime;
}
}
previousInstance = endMatInstance;
}
if (endMatInstance == null) {
return currentMatTime;
}
else {
return endMatInstance.getTime();
}
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
|| coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
+ " job is not in PREP or RUNNING but in " + coordJob.getStatus());
}
if (coordJob.isDoneMaterialization()) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
+ " job is already materialized");
}
if (coordJob.getNextMaterializedTimestamp() != null
&& coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
+ " job is already materialized");
}
Timestamp startTime = coordJob.getNextMaterializedTimestamp();
if (startTime == null) {
startTime = coordJob.getStartTimestamp();
if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
+ jobId + " job's start time is not reached yet - nothing to materialize");
}
}
if (coordJob.getNextMaterializedTimestamp() != null
&& coordJob.getNextMaterializedTimestamp().after(
new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
+ " Request is for future time. Lookup time is "
+ new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000) + " mat time is "
+ coordJob.getNextMaterializedTimestamp());
}
if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
+ ", all actions have been materialized from start time = " + coordJob.getStartTime()
+ " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
}
if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
+ ", action is *already* materialized for Materialization start time = " + startMatdTime
+ ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
}
if (endMatdTime.after(coordJob.getEndTime())) {
throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
+ " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
+ coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
}
if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
+ ", materialization start time = " + startMatdTime
+ " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
+ ", job status = " + coordJob.getStatusStr());
}
}
@Override
protected void materialize() throws CommandException {
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
try {
materializeActions(false);
updateJobMaterializeInfo(coordJob);
}
catch (CommandException ex) {
LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
coordJob.setStatus(Job.Status.FAILED);
coordJob.resetPending();
// remove any materialized actions and slaEvents
insertList.clear();
}
catch (Exception e) {
LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
coordJob.setStatus(Job.Status.FAILED);
try {
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob);
}
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1011, jex);
}
throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
} finally {
cron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".materialize", cron);
}
}
/**
* Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
*
* @param dryrun if this is a dry run
* @return materialized action(s)
* @throws Exception thrown if failed to materialize actions
*/
protected String materializeActions(boolean dryrun) throws Exception {
Configuration jobConf = null;
try {
jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
}
catch (IOException ioe) {
LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
}
String jobXml = coordJob.getJobXml();
Element eJob = XmlUtils.parseXml(jobXml);
TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
String frequency = coordJob.getFrequency();
TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
Calendar start = Calendar.getInstance(appTz);
start.setTime(startMatdTime);
DateUtils.moveToEnd(start, endOfFlag);
Calendar end = Calendar.getInstance(appTz);
end.setTime(endMatdTime);
lastActionNumber = coordJob.getLastActionNumber();
//Intentionally printing dates in their own timezone, not Oozie timezone
LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
+ end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
+ freqTU + ",\n lastActionNumber " + lastActionNumber);
// Keep the actual start time
Calendar origStart = Calendar.getInstance(appTz);
origStart.setTime(coordJob.getStartTimestamp());
// Move to the End of duration, if needed.
DateUtils.moveToEnd(origStart, endOfFlag);
StringBuilder actionStrings = new StringBuilder();
Date jobPauseTime = coordJob.getPauseTime();
Calendar pause = null;
if (jobPauseTime != null) {
pause = Calendar.getInstance(appTz);
pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
}
String action = null;
int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
// If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
boolean ignoreMaxActions =
(coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
&& endMatdTime.before(new Date());
LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
+ ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
boolean isCronFrequency = false;
Calendar effStart = (Calendar) start.clone();
try {
int intFrequency = Integer.parseInt(coordJob.getFrequency());
effStart = (Calendar) origStart.clone();
effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency);
}
catch (NumberFormatException e) {
isCronFrequency = true;
}
boolean firstMater = true;
while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
if (pause != null && effStart.compareTo(pause) >= 0) {
break;
}
Date nextTime = effStart.getTime();
if (isCronFrequency) {
if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) {
effStart.add(Calendar.MINUTE, -1);
firstMater = false;
}
nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob);
Date prevTime = new Date(effStart.getTimeInMillis());
effStart.setTime(nextTime);
addDSTChangeToNominalTime(prevTime, nextTime, coordJob);
}
if (effStart.compareTo(end) < 0) {
if (pause != null && effStart.compareTo(pause) >= 0) {
break;
}
CoordinatorActionBean actionBean = new CoordinatorActionBean();
lastActionNumber++;
int timeout = coordJob.getTimeout();
LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime())
+ ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes");
Date actualTime = new Date();
action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
nextTime, actualTime, lastActionNumber, jobConf, actionBean);
actionBean.setTimeOut(timeout);
if (!dryrun) {
storeToDB(actionBean, action, jobConf); // Storing to table
}
else {
actionStrings.append("action for new instance");
actionStrings.append(action);
}
}
else {
break;
}
if (!isCronFrequency) {
effStart = (Calendar) origStart.clone();
effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency()));
}
}
if (isCronFrequency) {
if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
//Since we exceed the throttle, we need to move the nextMadtime forward
//to avoid creating duplicate actions
if (!firstMater) {
effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
}
}
}
endMatdTime = effStart.getTime();
if (!dryrun) {
return action;
}
else {
return actionStrings.toString();
}
}
/**
* Apply DST correction according the job`s timezone, if the difference between the previous nominal time and the actual one
* is greater or equal than 24 hours.
* Calendar uses a similar approach: applies DST change if the TimeUnit is lower or equal than TimeUnit.DAY. With this approach
* a similar behaviour can be achieved.
*
* @see {@http://oozie.apache.org/docs/5.0.0/CoordinatorFunctionalSpec.html#a7._Handling_Timezones_and_Daylight_Saving_Time}
*
* @param prevTime nominal time of the previous coordinator action
* @param nextTime nominal time of the actual coordinator action
* @param coordJob the coordinator job
*/
private void addDSTChangeToNominalTime(Date prevTime, Date nextTime, CoordinatorJobBean coordJob) {
final long differenceBetweenTwoActionsInSec = java.util.concurrent.TimeUnit.MILLISECONDS.toSeconds
(Math.abs(prevTime.getTime() - nextTime.getTime()));
final long oneDayInSeconds = java.util.concurrent.TimeUnit.DAYS.toSeconds(1);
if (differenceBetweenTwoActionsInSec < oneDayInSeconds) {
return;
}
final long dstOffset = DaylightOffsetCalculator.getDSTOffset(DateUtils
.getTimeZone(coordJob.getTimeZone()), coordJob.getStartTime(), nextTime);
LOG.debug("[{0}] ms DST offset applied to nominal time for coordinator job: [{1}]", dstOffset, coordJob.getId());
nextTime.setTime(nextTime.getTime() + dstOffset);
}
private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception {
LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
+ actionXml.length());
actionBean.setActionXml(actionXml);
insertList.add(actionBean);
writeActionSlaRegistration(actionXml, actionBean, jobConf);
}
private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf)
throws Exception {
Element eAction = XmlUtils.parseXml(actionXml);
Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG);
if (slaEvent != null) {
insertList.add(slaEvent);
}
// inserting into new table also
SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false,
CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf));
}
private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
job.setLastActionTime(endMatdTime);
job.setLastActionNumber(lastActionNumber);
// if the job endtime == action endtime, we don't need to materialize this job anymore
Date jobEndTime = job.getEndTime();
if (job.getStatus() == CoordinatorJob.Status.PREP){
LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
job.setStatus(Job.Status.RUNNING);
}
job.setPending();
Calendar end = Calendar.getInstance();
end.setTime(jobEndTime);
if (end.getTime().compareTo(endMatdTime) <= 0) {
LOG.info("[" + job.getId() + "]: all actions have been materialized, set pending to true");
// set doneMaterialization to true when materialization is done
job.setDoneMaterialization();
}
job.setStatus(StatusUtils.getStatus(job));
LOG.info("Coord Job status updated to = " + job.getStatus());
job.setNextMaterializedTime(endMatdTime);
}
@Override
public String getKey() {
return getName() + "_" + jobId;
}
@Override
public void notifyParent() throws CommandException {
// update bundle action only when status changes in coord job
if (this.coordJob.getBundleId() != null) {
if (!prevStatus.equals(coordJob.getStatus())) {
BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
bundleStatusUpdate.call();
}
}
}
}