| /******************************************************************************* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| *******************************************************************************/ |
| package org.ofbiz.service.job; |
| |
| import java.io.IOException; |
| import java.sql.Timestamp; |
| import java.util.Date; |
| import java.util.Map; |
| |
| import javax.xml.parsers.ParserConfigurationException; |
| |
| import javolution.util.FastMap; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.ofbiz.base.config.GenericConfigException; |
| import org.ofbiz.base.util.Debug; |
| import org.ofbiz.base.util.UtilDateTime; |
| import org.ofbiz.base.util.UtilGenerics; |
| import org.ofbiz.base.util.UtilValidate; |
| import org.ofbiz.entity.Delegator; |
| import org.ofbiz.entity.GenericEntityException; |
| import org.ofbiz.entity.GenericValue; |
| import org.ofbiz.entity.condition.EntityCondition; |
| import org.ofbiz.entity.condition.EntityFieldMap; |
| import org.ofbiz.entity.serialize.SerializeException; |
| import org.ofbiz.entity.serialize.XmlSerializer; |
| import org.ofbiz.service.DispatchContext; |
| import org.ofbiz.service.GenericRequester; |
| import org.ofbiz.service.ServiceUtil; |
| import org.ofbiz.service.calendar.RecurrenceInfo; |
| import org.ofbiz.service.calendar.RecurrenceInfoException; |
| import org.ofbiz.service.calendar.TemporalExpression; |
| import org.ofbiz.service.calendar.TemporalExpressionWorker; |
| import org.ofbiz.service.config.ServiceConfigUtil; |
| import org.xml.sax.SAXException; |
| |
| import com.ibm.icu.util.Calendar; |
| |
| /** |
| * A {@link Job} that is backed by the entity engine. Job data is stored |
| * in the JobSandbox entity. |
| * <p>When the job is queued, this object "owns" the entity value. Any external changes |
| * are ignored except the cancelDateTime field - jobs can be canceled after they are queued.</p> |
| */ |
| @SuppressWarnings("serial") |
| public class PersistedServiceJob extends GenericServiceJob { |
| |
| public static final String module = PersistedServiceJob.class.getName(); |
| |
| private final transient Delegator delegator; |
| private long nextRecurrence = -1; |
| private final long maxRetry; |
| private final long currentRetryCount; |
| private final GenericValue jobValue; |
| private final long startTime; |
| |
| /** |
| * Creates a new PersistedServiceJob |
| * @param dctx |
| * @param jobValue |
| * @param req |
| */ |
| public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) { |
| super(dctx, jobValue.getString("jobId"), jobValue.getString("jobName"), null, null, req); |
| this.delegator = dctx.getDelegator(); |
| this.jobValue = jobValue; |
| Timestamp storedDate = jobValue.getTimestamp("runTime"); |
| this.startTime = storedDate.getTime(); |
| this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1; |
| Long retryCount = jobValue.getLong("currentRetryCount"); |
| if (retryCount != null) { |
| this.currentRetryCount = retryCount.longValue(); |
| } else { |
| // backward compatibility |
| this.currentRetryCount = getRetries(this.delegator); |
| } |
| } |
| |
| @Override |
| public void queue() throws InvalidJobException { |
| super.queue(); |
| try { |
| jobValue.refresh(); |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException("Unable to refresh JobSandbox value", e); |
| } |
| if (!JobManager.instanceId.equals(jobValue.getString("runByInstanceId"))) { |
| throw new InvalidJobException("Job has been accepted by a different instance"); |
| } |
| Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime"); |
| Timestamp startTime = jobValue.getTimestamp("startDateTime"); |
| if (cancelTime != null || startTime != null) { |
| // job not available |
| throw new InvalidJobException("Job [" + getJobId() + "] is not available"); |
| } else { |
| jobValue.set("statusId", "SERVICE_QUEUED"); |
| try { |
| jobValue.store(); |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e); |
| } |
| if (Debug.verboseOn()) { |
| Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); |
| } |
| } |
| } |
| |
| @Override |
| protected void init() throws InvalidJobException { |
| super.init(); |
| try { |
| jobValue.refresh(); |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException("Unable to refresh JobSandbox value", e); |
| } |
| if (!JobManager.instanceId.equals(jobValue.getString("runByInstanceId"))) { |
| throw new InvalidJobException("Job has been accepted by a different instance"); |
| } |
| if (jobValue.getTimestamp("cancelDateTime") != null) { |
| // Job cancelled |
| throw new InvalidJobException("Job [" + getJobId() + "] was cancelled"); |
| } |
| jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); |
| jobValue.set("statusId", "SERVICE_RUNNING"); |
| try { |
| jobValue.store(); |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e); |
| } |
| if (Debug.verboseOn()) { |
| Debug.logVerbose("Job [" + getJobId() + "] running", module); |
| } |
| // configure any additional recurrences |
| long maxRecurrenceCount = -1; |
| long currentRecurrenceCount = 0; |
| TemporalExpression expr = null; |
| RecurrenceInfo recurrence = getRecurrenceInfo(); |
| if (recurrence != null) { |
| Debug.logWarning("Persisted Job [" + getJobId() + "] references a RecurrenceInfo, recommend using TemporalExpression instead", module); |
| currentRecurrenceCount = recurrence.getCurrentCount(); |
| expr = RecurrenceInfo.toTemporalExpression(recurrence); |
| } |
| if (expr == null && UtilValidate.isNotEmpty(jobValue.getString("tempExprId"))) { |
| try { |
| expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, jobValue.getString("tempExprId")); |
| } catch (GenericEntityException e) { |
| throw new RuntimeException(e.getMessage()); |
| } |
| } |
| if (jobValue.get("maxRecurrenceCount") != null) { |
| maxRecurrenceCount = jobValue.getLong("maxRecurrenceCount").longValue(); |
| } |
| if (jobValue.get("currentRecurrenceCount") != null) { |
| currentRecurrenceCount = jobValue.getLong("currentRecurrenceCount").longValue(); |
| } |
| if (maxRecurrenceCount != -1) { |
| currentRecurrenceCount++; |
| jobValue.set("currentRecurrenceCount", currentRecurrenceCount); |
| } |
| try { |
| if (expr != null && (maxRecurrenceCount == -1 || currentRecurrenceCount <= maxRecurrenceCount)) { |
| if (recurrence != null) { |
| recurrence.incrementCurrentCount(); |
| } |
| Calendar next = expr.next(Calendar.getInstance()); |
| if (next != null) { |
| createRecurrence(next.getTimeInMillis(), false); |
| } |
| } |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException(e); |
| } |
| if (Debug.infoOn()) Debug.logInfo("Job [" + getJobName() + "] Id [" + getJobId() + "] -- Next runtime: " + new Date(nextRecurrence), module); |
| } |
| |
| private void createRecurrence(long next, boolean isRetryOnFailure) throws GenericEntityException { |
| if (Debug.verboseOn()) Debug.logVerbose("Next runtime returned: " + next, module); |
| if (next > startTime) { |
| String pJobId = jobValue.getString("parentJobId"); |
| if (pJobId == null) { |
| pJobId = jobValue.getString("jobId"); |
| } |
| GenericValue newJob = GenericValue.create(jobValue); |
| newJob.remove("jobId"); |
| newJob.set("previousJobId", jobValue.getString("jobId")); |
| newJob.set("parentJobId", pJobId); |
| newJob.set("statusId", "SERVICE_PENDING"); |
| newJob.set("startDateTime", null); |
| newJob.set("runByInstanceId", null); |
| newJob.set("runTime", new java.sql.Timestamp(next)); |
| if (isRetryOnFailure) { |
| newJob.set("currentRetryCount", new Long(currentRetryCount + 1)); |
| } else { |
| newJob.set("currentRetryCount", new Long(0)); |
| } |
| nextRecurrence = next; |
| delegator.createSetNextSeqId(newJob); |
| if (Debug.verboseOn()) Debug.logVerbose("Created next job entry: " + newJob, module); |
| } |
| } |
| |
| @Override |
| protected void finish(Map<String, Object> result) throws InvalidJobException { |
| super.finish(result); |
| // set the finish date |
| jobValue.set("statusId", "SERVICE_FINISHED"); |
| jobValue.set("finishDateTime", UtilDateTime.nowTimestamp()); |
| String jobResult = null; |
| if (ServiceUtil.isError(result)) { |
| jobResult = StringUtils.substring(ServiceUtil.getErrorMessage(result), 0, 255); |
| } else { |
| jobResult = StringUtils.substring(ServiceUtil.makeSuccessMessage(result, "", "", "", ""), 0, 255); |
| } |
| if (UtilValidate.isNotEmpty(jobResult)) { |
| jobValue.set("jobResult", jobResult); |
| } |
| try { |
| jobValue.store(); |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "Cannot update the job [" + getJobId() + "] sandbox", module); |
| } |
| } |
| |
| @Override |
| protected void failed(Throwable t) throws InvalidJobException { |
| super.failed(t); |
| // if the job has not been re-scheduled; we need to re-schedule and run again |
| if (nextRecurrence == -1) { |
| if (this.canRetry()) { |
| // create a recurrence |
| Calendar cal = Calendar.getInstance(); |
| try { |
| cal.add(Calendar.MINUTE, ServiceConfigUtil.getServiceEngine().getThreadPool().getFailedRetryMin()); |
| } catch (GenericConfigException e) { |
| Debug.logWarning(e, "Unable to get retry minutes for job [" + getJobId() + "], defaulting to now: ", module); |
| } |
| long next = cal.getTimeInMillis(); |
| try { |
| createRecurrence(next, true); |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "Unable to re-schedule job [" + getJobId() + "]: ", module); |
| } |
| Debug.logInfo("Persisted Job [" + getJobId() + "] Failed. Re-Scheduling : " + next, module); |
| } else { |
| Debug.logWarning("Persisted Job [" + getJobId() + "] Failed. Max Retry Hit, not re-scheduling", module); |
| } |
| } |
| // set the failed status |
| jobValue.set("statusId", "SERVICE_FAILED"); |
| jobValue.set("finishDateTime", UtilDateTime.nowTimestamp()); |
| jobValue.set("jobResult", StringUtils.substring(t.getMessage(), 0, 255)); |
| try { |
| jobValue.store(); |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "Cannot update the JobSandbox entity", module); |
| } |
| } |
| |
| @Override |
| protected String getServiceName() { |
| if (jobValue == null || jobValue.get("serviceName") == null) { |
| return null; |
| } |
| return jobValue.getString("serviceName"); |
| } |
| |
| @Override |
| protected Map<String, Object> getContext() throws InvalidJobException { |
| Map<String, Object> context = null; |
| try { |
| if (!UtilValidate.isEmpty(jobValue.getString("runtimeDataId"))) { |
| GenericValue contextObj = jobValue.getRelatedOne("RuntimeData", false); |
| if (contextObj != null) { |
| context = UtilGenerics.checkMap(XmlSerializer.deserialize(contextObj.getString("runtimeInfo"), delegator), String.class, Object.class); |
| } |
| } |
| if (context == null) { |
| context = FastMap.newInstance(); |
| } |
| // check the runAsUser |
| if (!UtilValidate.isEmpty(jobValue.getString("runAsUser"))) { |
| context.put("userLogin", ServiceUtil.getUserLogin(dctx, context, jobValue.getString("runAsUser"))); |
| } |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "PersistedServiceJob.getContext(): Entity Exception", module); |
| } catch (SerializeException e) { |
| Debug.logError(e, "PersistedServiceJob.getContext(): Serialize Exception", module); |
| } catch (ParserConfigurationException e) { |
| Debug.logError(e, "PersistedServiceJob.getContext(): Parse Exception", module); |
| } catch (SAXException e) { |
| Debug.logError(e, "PersistedServiceJob.getContext(): SAXException", module); |
| } catch (IOException e) { |
| Debug.logError(e, "PersistedServiceJob.getContext(): IOException", module); |
| } |
| if (context == null) { |
| Debug.logError("Job context is null", module); |
| } |
| return context; |
| } |
| |
| // returns the number of current retries |
| private long getRetries(Delegator delegator) { |
| String pJobId = jobValue.getString("parentJobId"); |
| if (pJobId == null) { |
| return 0; |
| } |
| long count = 0; |
| try { |
| EntityFieldMap ecl = EntityCondition.makeConditionMap("parentJobId", pJobId, "statusId", "SERVICE_FAILED"); |
| count = delegator.findCountByCondition("JobSandbox", ecl, null, null); |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "Exception thrown while counting retries: ", module); |
| } |
| return count + 1; // add one for the parent |
| } |
| |
| private boolean canRetry() { |
| if (maxRetry == -1) { |
| return true; |
| } |
| return currentRetryCount < maxRetry; |
| } |
| |
| private RecurrenceInfo getRecurrenceInfo() { |
| try { |
| if (UtilValidate.isNotEmpty(jobValue.getString("recurrenceInfoId"))) { |
| GenericValue ri = jobValue.getRelatedOne("RecurrenceInfo", false); |
| if (ri != null) { |
| return new RecurrenceInfo(ri); |
| } |
| } |
| } catch (GenericEntityException e) { |
| Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module); |
| } catch (RecurrenceInfoException re) { |
| Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module); |
| } |
| return null; |
| } |
| |
| @Override |
| public void deQueue() throws InvalidJobException { |
| if (currentState != State.QUEUED) { |
| throw new InvalidJobException("Illegal state change"); |
| } |
| currentState = State.CREATED; |
| try { |
| jobValue.refresh(); |
| jobValue.set("startDateTime", null); |
| jobValue.set("runByInstanceId", null); |
| jobValue.set("statusId", "SERVICE_PENDING"); |
| jobValue.store(); |
| } catch (GenericEntityException e) { |
| throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); |
| } |
| if (Debug.verboseOn()) { |
| Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); |
| } |
| } |
| |
| @Override |
| public Date getStartTime() { |
| return new Date(startTime); |
| } |
| } |