blob: 72d1bfc39570c75d87ed1d5337adaac7fb0f6891 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.service.job;
import java.io.IOException;
import java.sql.Timestamp;
import com.ibm.icu.util.Calendar;
import java.util.Date;
import java.util.Map;
import javax.xml.parsers.ParserConfigurationException;
import javolution.util.FastMap;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.UtilDateTime;
import org.ofbiz.base.util.UtilGenerics;
import org.ofbiz.base.util.UtilProperties;
import org.ofbiz.base.util.UtilValidate;
import org.ofbiz.service.calendar.TemporalExpression;
import org.ofbiz.service.calendar.TemporalExpressionWorker;
import org.ofbiz.entity.DelegatorFactory;
import org.ofbiz.entity.GenericDelegator;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.GenericValue;
import org.ofbiz.entity.condition.EntityCondition;
import org.ofbiz.entity.condition.EntityFieldMap;
import org.ofbiz.entity.serialize.SerializeException;
import org.ofbiz.entity.serialize.XmlSerializer;
import org.ofbiz.service.DispatchContext;
import org.ofbiz.service.GenericRequester;
import org.ofbiz.service.ServiceUtil;
import org.ofbiz.service.ThreadContext;
import org.ofbiz.service.calendar.RecurrenceInfo;
import org.ofbiz.service.config.ServiceConfigUtil;
import org.xml.sax.SAXException;
/**
* Entity Service Job - Store => Schedule => Run
*/
@SuppressWarnings("serial")
public class PersistedServiceJob extends GenericServiceJob {
public static final String module = PersistedServiceJob.class.getName();
private transient GenericDelegator delegator = null;
private Timestamp storedDate = null;
private long nextRecurrence = -1;
private long maxRetry = -1;
private boolean warningLogged = false;
/**
* Creates a new PersistedServiceJob
* @param dctx
* @param jobValue
* @param req
*/
public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) {
super(jobValue.getString("jobId"), jobValue.getString("jobName"));
this.delegator = dctx.getDelegator();
this.requester = req;
this.dctx = dctx;
this.storedDate = jobValue.getTimestamp("runTime");
this.runtime = storedDate.getTime();
this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1;
}
public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req, GenericDelegator delegator) {
super(jobValue.getString("jobId"), jobValue.getString("jobName"));
this.delegator = delegator;
this.requester = req;
this.dctx = dctx;
this.storedDate = jobValue.getTimestamp("runTime");
this.runtime = storedDate.getTime();
this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1;
}
@Override
public void queue() throws InvalidJobException {
super.queue();
// refresh the job object
GenericValue jobValue = null;
try {
jobValue = this.getJob();
jobValue.refresh();
} catch (GenericEntityException e) {
runtime = -1;
throw new InvalidJobException("Unable to refresh Job object", e);
}
// make sure it isn't already set/cancelled
if (runtime != -1) {
Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
Timestamp startTime = jobValue.getTimestamp("startDateTime");
if (cancelTime != null || startTime != null) {
// job not available
runtime = -1;
throw new InvalidJobException("Job [" + getJobId() + "] is not available");
} else {
// set the start time to now
jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
jobValue.set("statusId", "SERVICE_RUNNING");
try {
jobValue.store();
} catch (GenericEntityException e) {
runtime = -1;
throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e);
}
}
}
}
/**
* @see org.ofbiz.service.job.GenericServiceJob#init()
*/
@Override
protected void init() throws InvalidJobException {
super.init();
// configure any addition recurrences
GenericValue job = this.getJob();
long maxRecurrenceCount = -1;
long currentRecurrenceCount = 0;
TemporalExpression expr = null;
RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(job);
if (recurrence != null) {
if (!this.warningLogged) {
Debug.logWarning("Persisted Job [" + getJobId() + "] references a RecurrenceInfo, recommend using TemporalExpression instead", module);
this.warningLogged = true;
}
currentRecurrenceCount = recurrence.getCurrentCount();
expr = RecurrenceInfo.toTemporalExpression(recurrence);
}
if (expr == null && UtilValidate.isNotEmpty(job.getString("tempExprId"))) {
try {
expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, job.getString("tempExprId"));
} catch (GenericEntityException e) {
throw new RuntimeException(e.getMessage());
}
}
String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
if (!instanceId.equals(job.getString("runByInstanceId"))) {
throw new InvalidJobException("Job has been accepted by a different instance!");
}
if (job.get("maxRecurrenceCount") != null) {
maxRecurrenceCount = job.getLong("maxRecurrenceCount").longValue();
}
if (job.get("currentRecurrenceCount") != null) {
currentRecurrenceCount = job.getLong("currentRecurrenceCount").longValue();
}
if (maxRecurrenceCount != -1) {
currentRecurrenceCount++;
job.set("currentRecurrenceCount", currentRecurrenceCount);
}
try {
if (expr != null && (maxRecurrenceCount == -1 || currentRecurrenceCount <= maxRecurrenceCount)) {
if (recurrence != null) {
recurrence.incrementCurrentCount();
}
Calendar next = expr.next(Calendar.getInstance());
if (next != null) {
createRecurrence(job, next.getTimeInMillis());
}
}
} catch (GenericEntityException e) {
throw new RuntimeException(e.getMessage());
}
if (Debug.infoOn()) Debug.logInfo(this.toString() + "[" + getJobId() + "] -- Next runtime: " + new Date(nextRecurrence), module);
}
private void createRecurrence(GenericValue job, long next) throws GenericEntityException {
if (Debug.verboseOn()) Debug.logVerbose("Next runtime returned: " + next, module);
if (next > runtime) {
String pJobId = job.getString("parentJobId");
if (pJobId == null) {
pJobId = job.getString("jobId");
}
GenericValue newJob = GenericValue.create(job);
newJob.remove("jobId");
newJob.set("previousJobId", job.getString("jobId"));
newJob.set("parentJobId", pJobId);
newJob.set("statusId", "SERVICE_PENDING");
newJob.set("startDateTime", null);
newJob.set("runByInstanceId", null);
newJob.set("runTime", new java.sql.Timestamp(next));
nextRecurrence = next;
delegator.createSetNextSeqId(newJob);
if (Debug.verboseOn()) Debug.logVerbose("Created next job entry: " + newJob, module);
}
}
/**
* @see org.ofbiz.service.job.GenericServiceJob#finish()
*/
@Override
protected void finish() throws InvalidJobException {
super.finish();
// set the finish date
GenericValue job = getJob();
String status = job.getString("statusId");
if (status == null || "SERVICE_RUNNING".equals(status)) {
job.set("statusId", "SERVICE_FINISHED");
}
job.set("finishDateTime", UtilDateTime.nowTimestamp());
try {
job.store();
} catch (GenericEntityException e) {
Debug.logError(e, "Cannot update the job [" + getJobId() + "] sandbox", module);
}
}
/**
* @see org.ofbiz.service.job.GenericServiceJob#failed(Throwable)
*/
@Override
protected void failed(Throwable t) throws InvalidJobException {
super.failed(t);
GenericValue job = getJob();
// if the job has not been re-scheduled; we need to re-schedule and run again
if (nextRecurrence == -1) {
if (this.canRetry()) {
// create a recurrence
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.MINUTE, ServiceConfigUtil.getFailedRetryMin());
long next = cal.getTimeInMillis();
try {
createRecurrence(job, next);
} catch (GenericEntityException gee) {
Debug.logError(gee, "ERROR: Unable to re-schedule job [" + getJobId() + "] to re-run : " + job, module);
}
Debug.log("Persisted Job [" + getJobId() + "] Failed Re-Scheduling : " + next, module);
} else {
Debug.logWarning("Persisted Job [" + getJobId() + "] Failed - Max Retry Hit; not re-scheduling", module);
}
}
// set the failed status
job.set("statusId", "SERVICE_FAILED");
job.set("finishDateTime", UtilDateTime.nowTimestamp());
try {
job.store();
} catch (GenericEntityException e) {
Debug.logError(e, "Cannot update the job sandbox", module);
}
}
/**
* @see org.ofbiz.service.job.GenericServiceJob#getServiceName()
*/
@Override
protected String getServiceName() throws InvalidJobException {
GenericValue jobObj = getJob();
if (jobObj == null || jobObj.get("serviceName") == null) {
return null;
}
return jobObj.getString("serviceName");
}
/**
* @see org.ofbiz.service.job.GenericServiceJob#getContext()
*/
@Override
protected Map<String, Object> getContext() throws InvalidJobException {
Map<String, Object> context = null;
try {
GenericValue jobObj = getJob();
if (!UtilValidate.isEmpty(jobObj.getString("runtimeDataId"))) {
GenericValue contextObj = jobObj.getRelatedOne("RuntimeData");
if (contextObj != null) {
context = UtilGenerics.checkMap(XmlSerializer.deserialize(contextObj.getString("runtimeInfo"), delegator), String.class, Object.class);
}
}
if (context == null) {
context = FastMap.newInstance();
}
// check the runAsUser
if (!UtilValidate.isEmpty(jobObj.get("runAsUser"))) {
context.put("userLogin", ServiceUtil.getUserLogin(dctx, context, jobObj.getString("runAsUser")));
}
ThreadContext.reset();
GenericDelegator newDelegator = DelegatorFactory.getGenericDelegator(this.delegator.getDelegatorName());
ThreadContext.setDelegator(newDelegator);
ThreadContext.setDispatcher(this.dctx.getDispatcher());
ThreadContext.initializeContext(context);
} catch (GenericEntityException e) {
Debug.logError(e, "PersistedServiceJob.getContext(): Entity Exception", module);
} catch (SerializeException e) {
Debug.logError(e, "PersistedServiceJob.getContext(): Serialize Exception", module);
} catch (ParserConfigurationException e) {
Debug.logError(e, "PersistedServiceJob.getContext(): Parse Exception", module);
} catch (SAXException e) {
Debug.logError(e, "PersistedServiceJob.getContext(): SAXException", module);
} catch (IOException e) {
Debug.logError(e, "PersistedServiceJob.getContext(): IOException", module);
} catch (Exception e) {
Debug.logError(e, "PersistedServiceJob.getContext(): Exception ", module);
}
if (context == null) {
Debug.logError("Job context is null", module);
}
return context;
}
// gets the job value object
private GenericValue getJob() throws InvalidJobException {
try {
GenericValue jobObj = delegator.findOne("JobSandbox", false, "jobId", getJobId());
if (jobObj == null) {
throw new InvalidJobException("Job [" + getJobId() + "] came back null from datasource");
}
return jobObj;
} catch (GenericEntityException e) {
throw new InvalidJobException("Cannot get job definition [" + getJobId() + "] from entity", e);
}
}
// returns the number of current retries
private long getRetries() throws InvalidJobException {
GenericValue job = this.getJob();
String pJobId = job.getString("parentJobId");
if (pJobId == null) {
return 0;
}
long count = 0;
try {
EntityFieldMap ecl = EntityCondition.makeConditionMap("parentJobId", pJobId, "statusId", "SERVICE_FAILED");
count = delegator.findCountByCondition("JobSandbox", ecl, null, null);
} catch (GenericEntityException e) {
Debug.logError(e, module);
}
return count + 1; // add one for the parent
}
private boolean canRetry() throws InvalidJobException {
if (maxRetry == -1) {
return true;
}
if (this.getRetries() < maxRetry) {
return true;
}
return false;
}
}