blob: b5508377d918fabf11882171ea27704c06a1b594 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.service.job;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javolution.util.FastList;
import javolution.util.FastMap;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.GeneralRuntimeException;
import org.ofbiz.base.util.UtilDateTime;
import org.ofbiz.base.util.UtilMisc;
import org.ofbiz.base.util.UtilProperties;
import org.ofbiz.base.util.UtilValidate;
import org.ofbiz.entity.Delegator;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.GenericValue;
import org.ofbiz.entity.condition.EntityCondition;
import org.ofbiz.entity.condition.EntityConditionList;
import org.ofbiz.entity.condition.EntityExpr;
import org.ofbiz.entity.condition.EntityOperator;
import org.ofbiz.entity.serialize.SerializeException;
import org.ofbiz.entity.serialize.XmlSerializer;
import org.ofbiz.entity.transaction.GenericTransactionException;
import org.ofbiz.entity.transaction.TransactionUtil;
import org.ofbiz.service.DispatchContext;
import org.ofbiz.service.GenericDispatcher;
import org.ofbiz.service.LocalDispatcher;
import org.ofbiz.service.calendar.RecurrenceInfo;
import org.ofbiz.service.calendar.RecurrenceInfoException;
import org.ofbiz.service.config.ServiceConfigUtil;
/**
* JobManager
*/
public class JobManager {
public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
public static final String module = JobManager.class.getName();
public static Map<String, JobManager> registeredManagers = FastMap.newInstance();
protected Delegator delegator;
protected JobPoller jp;
/** Creates a new JobManager object. */
public JobManager(Delegator delegator) {
this(delegator, true);
}
public JobManager(Delegator delegator, boolean enabled) {
if (delegator == null) {
throw new GeneralRuntimeException("ERROR: null delegator passed, cannot create JobManager");
}
if (JobManager.registeredManagers.get(delegator.getDelegatorName()) != null) {
throw new GeneralRuntimeException("JobManager for [" + delegator.getDelegatorName() + "] already running");
}
this.delegator = delegator;
jp = new JobPoller(this, enabled);
JobManager.registeredManagers.put(delegator.getDelegatorName(), this);
}
public static JobManager getInstance(Delegator delegator, boolean enabled)
{
JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
if (jm == null) {
jm = new JobManager(delegator, enabled);
}
return jm;
}
/** Queues a Job to run now. */
public void runJob(Job job) throws JobManagerException {
if (job.isValid()) {
jp.queueNow(job);
}
}
/** Returns the ServiceDispatcher. */
public LocalDispatcher getDispatcher() {
LocalDispatcher thisDispatcher = GenericDispatcher.getLocalDispatcher(delegator.getDelegatorName(), delegator);
return thisDispatcher;
}
/** Returns the Delegator. */
public Delegator getDelegator() {
return this.delegator;
}
public synchronized List<Job> poll() {
List<Job> poll = FastList.newInstance();
// sort the results by time
List<String> order = UtilMisc.toList("runTime");
// basic query
List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO,
UtilDateTime.nowTimestamp()), EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
EntityCondition.makeCondition("cancelDateTime", EntityOperator.EQUALS, null),
EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null));
// limit to just defined pools
List<String> pools = ServiceConfigUtil.getRunPools();
List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
if (pools != null) {
for (String poolName: pools) {
poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName));
}
}
// make the conditions
EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
// we will loop until we have no more to do
boolean pollDone = false;
while (!pollDone) {
// an extra protection for synchronization, help make sure we don't get in here more than once
synchronized (this) {
boolean beganTransaction = false;
try {
beganTransaction = TransactionUtil.begin();
if (!beganTransaction) {
Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
return null;
}
List<Job> localPoll = FastList.newInstance();
// first update the jobs w/ this instance running information
delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
// now query all the 'queued' jobs for this instance
List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order);
//jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
if (UtilValidate.isNotEmpty(jobEnt)) {
for (GenericValue v: jobEnt) {
DispatchContext dctx = getDispatcher().getDispatchContext();
if (dctx == null) {
Debug.logError("Unable to locate DispatchContext object; not running job!", module);
continue;
}
Job job = new PersistedServiceJob(dctx, v, null); // TODO fix the requester
try {
job.queue();
localPoll.add(job);
} catch (InvalidJobException e) {
Debug.logError(e, module);
}
}
} else {
pollDone = true;
}
// nothing should go wrong at this point, so add to the general list
poll.addAll(localPoll);
} catch (Throwable t) {
// catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
Debug.logError(t, errMsg, module);
try {
// only rollback the transaction if we started one...
TransactionUtil.rollback(beganTransaction, errMsg, t);
} catch (GenericEntityException e2) {
Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
}
} finally {
try {
// only commit the transaction if we started one... but make sure we try
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
// we don't really want to do anything different, so just log and move on
Debug.logError(e, errMsg, module);
}
}
}
}
return poll;
}
public synchronized void reloadCrashedJobs() {
String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
List<GenericValue> crashed = null;
List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs);
try {
crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false);
} catch (GenericEntityException e) {
Debug.logError(e, "Unable to load crashed jobs", module);
}
if (UtilValidate.isNotEmpty(crashed)) {
try {
int rescheduled = 0;
for (GenericValue job: crashed) {
Timestamp now = UtilDateTime.nowTimestamp();
Debug.log("Scheduling Job : " + job, module);
String pJobId = job.getString("parentJobId");
if (pJobId == null) {
pJobId = job.getString("jobId");
}
GenericValue newJob = GenericValue.create(job);
newJob.set("statusId", "SERVICE_PENDING");
newJob.set("runTime", now);
newJob.set("previousJobId", job.getString("jobId"));
newJob.set("parentJobId", pJobId);
newJob.set("startDateTime", null);
newJob.set("runByInstanceId", null);
delegator.createSetNextSeqId(newJob);
// set the cancel time on the old job to the same as the re-schedule time
job.set("statusId", "SERVICE_CRASHED");
job.set("cancelDateTime", now);
delegator.store(job);
rescheduled++;
}
if (Debug.infoOn()) Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
} catch (GenericEntityException e) {
Debug.logError(e, module);
}
} else {
if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module);
}
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param serviceName The name of the service to invoke
*@param context The context for the service
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param count The number of times to repeat
*/
public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count) throws JobManagerException {
schedule(serviceName, context, startTime, frequency, interval, count, 0);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param serviceName The name of the service to invoke
*@param context The context for the service
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param endTime The time in milliseconds the service should expire
*/
public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, long endTime) throws JobManagerException {
schedule(serviceName, context, startTime, frequency, interval, -1, endTime);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param serviceName The name of the service to invoke
*@param context The context for the service
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param count The number of times to repeat
*@param endTime The time in milliseconds the service should expire
*/
public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException {
schedule(null, serviceName, context, startTime, frequency, interval, count, endTime);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param poolName The name of the pool to run the service from
*@param serviceName The name of the service to invoke
*@param context The context for the service
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param count The number of times to repeat
*@param endTime The time in milliseconds the service should expire
*/
public void schedule(String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException {
schedule(null, null, serviceName, context, startTime, frequency, interval, count, endTime, -1);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param jobName The name of the job
*@param poolName The name of the pool to run the service from
*@param serviceName The name of the service to invoke
*@param context The context for the service
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param count The number of times to repeat
*@param endTime The time in milliseconds the service should expire
*@param maxRetry The max number of retries on failure (-1 for no max)
*/
public void schedule(String jobName, String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException {
if (delegator == null) {
Debug.logWarning("No delegator referenced; cannot schedule job.", module);
return;
}
// persist the context
String dataId = null;
try {
GenericValue runtimeData = delegator.makeValue("RuntimeData");
runtimeData.set("runtimeInfo", XmlSerializer.serialize(context));
runtimeData = delegator.createSetNextSeqId(runtimeData);
dataId = runtimeData.getString("runtimeDataId");
} catch (GenericEntityException ee) {
throw new JobManagerException(ee.getMessage(), ee);
} catch (SerializeException se) {
throw new JobManagerException(se.getMessage(), se);
} catch (IOException ioe) {
throw new JobManagerException(ioe.getMessage(), ioe);
}
// schedule the job
schedule(jobName, poolName, serviceName, dataId, startTime, frequency, interval, count, endTime, maxRetry);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param poolName The name of the pool to run the service from
*@param serviceName The name of the service to invoke
*@param dataId The persisted context (RuntimeData.runtimeDataId)
*@param startTime The time in milliseconds the service should run
*/
public void schedule(String poolName, String serviceName, String dataId, long startTime) throws JobManagerException {
schedule(null, poolName, serviceName, dataId, startTime, -1, 0, 1, 0, -1);
}
/**
* Schedule a job to start at a specific time with specific recurrence info
*@param jobName The name of the job
*@param poolName The name of the pool to run the service from
*@param serviceName The name of the service to invoke
*@param dataId The persisted context (RuntimeData.runtimeDataId)
*@param startTime The time in milliseconds the service should run
*@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
*@param interval The interval of the frequency recurrence
*@param count The number of times to repeat
*@param endTime The time in milliseconds the service should expire
*@param maxRetry The max number of retries on failure (-1 for no max)
*/
public void schedule(String jobName, String poolName, String serviceName, String dataId, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException {
if (delegator == null) {
Debug.logWarning("No delegator referenced; cannot schedule job.", module);
return;
}
// create the recurrence
String infoId = null;
if (frequency > -1 && count != 0) {
try {
RecurrenceInfo info = RecurrenceInfo.makeInfo(delegator, startTime, frequency, interval, count);
infoId = info.primaryKey();
} catch (RecurrenceInfoException e) {
throw new JobManagerException(e.getMessage(), e);
}
}
// set the persisted fields
if (UtilValidate.isEmpty(jobName)) {
jobName = Long.toString((new Date().getTime()));
}
Map<String, Object> jFields = UtilMisc.<String, Object>toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime),
"serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId);
// set the pool ID
if (UtilValidate.isNotEmpty(poolName)) {
jFields.put("poolId", poolName);
} else {
jFields.put("poolId", ServiceConfigUtil.getSendPool());
}
// set the loader name
jFields.put("loaderName", delegator.getDelegatorName());
// set the max retry
jFields.put("maxRetry", Long.valueOf(maxRetry));
// create the value and store
GenericValue jobV;
try {
jobV = delegator.makeValue("JobSandbox", jFields);
delegator.createSetNextSeqId(jobV);
} catch (GenericEntityException e) {
throw new JobManagerException(e.getMessage(), e);
}
}
/**
* Kill a JobInvoker Thread.
* @param threadName Name of the JobInvoker Thread to kill.
*/
public void killThread(String threadName) {
jp.killThread(threadName);
}
/**
* Get a List of each threads current state.
* @return List containing a Map of each thread's state.
*/
public List<Map<String, Object>> processList() {
return jp.getPoolState();
}
/** Close out the scheduler thread. */
public void shutdown() {
if (jp != null) {
jp.stop();
jp = null;
Debug.logInfo("JobManager: Stopped Scheduler Thread.", module);
}
}
@Override
public void finalize() throws Throwable {
this.shutdown();
super.finalize();
}
/** gets the recurrence info object for a job. */
public static RecurrenceInfo getRecurrenceInfo(GenericValue job) {
try {
if (job != null && !UtilValidate.isEmpty(job.getString("recurrenceInfoId"))) {
if (job.get("cancelDateTime") != null) {
// cancel has been flagged, no more recurrence
return null;
}
GenericValue ri = job.getRelatedOne("RecurrenceInfo");
if (ri != null) {
return new RecurrenceInfo(ri);
} else {
return null;
}
} else {
return null;
}
} catch (GenericEntityException e) {
e.printStackTrace();
Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module);
} catch (RecurrenceInfoException re) {
re.printStackTrace();
Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module);
}
return null;
}
}