blob: e3e1dd18d3547da310acd6e64f2450aac445e597 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.service.job;
import java.util.Date;
import org.apache.commons.lang.math.NumberUtils;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.UtilDateTime;
import org.ofbiz.entity.transaction.GenericTransactionException;
import org.ofbiz.entity.transaction.TransactionUtil;
import org.ofbiz.security.authz.AbstractAuthorization;
import org.ofbiz.service.config.ServiceConfigUtil;
/**
* JobInvoker
*/
public class JobInvoker implements Runnable {
public static final String module = JobInvoker.class.getName();
public static final long THREAD_TTL = 18000000;
public static final int WAIT_TIME = 750;
private JobPoller jp = null;
private Thread thread = null;
private Date created = null;
private String name = null;
private int count = 0;
private int wait = 0;
private volatile boolean run = false;
private volatile Job currentJob = null;
private volatile int statusCode = 0;
private volatile long jobStart = 0;
public JobInvoker(JobPoller jp) {
this(jp, WAIT_TIME);
}
public JobInvoker(JobPoller jp, int wait) {
this.created = new Date();
this.run = true;
this.count = 0;
this.jp = jp;
this.wait = wait;
// service dispatcher delegator name (for thread name)
String delegatorName = jp.getManager().getDelegator().getDelegatorName();
// get a new thread
this.thread = new Thread(this);
this.name = delegatorName + "-invoker-" + this.thread.getName();
this.thread.setDaemon(false);
this.thread.setName(this.name);
if (Debug.verboseOn()) Debug.logVerbose("JobInvoker: Starting Invoker Thread -- " + thread.getName(), module);
this.thread.start();
}
protected JobInvoker() {}
/**
* Tells the thread to stop after the next job.
*/
public void stop() {
run = false;
}
/**
* Wakes up this thread.
*/
public void wakeUp() {
notifyAll();
}
/**
* Gets the number of times this thread was used.
* @return The number of times used.
*/
public int getUsage() {
return count;
}
/**
* Gets the remaining time this thread has before it is killed
* @return Time in millis remaining
*/
public long getTimeRemaining() {
long now = UtilDateTime.nowTimestamp().getTime();
long time = getTime();
long ttl = getTTL();
return (time + ttl) - now;
}
/**
* Gets the time when this thread was created.
* @return Time in milliseconds when this was created.
*/
public long getTime() {
return created.getTime();
}
/**
* Gets the name of this JobInvoker.
* @return Name of the invoker.
*/
public String getName() {
return this.name;
}
/**
* Gets the status code for this thread (0 = sleeping, 1 = running job)
* @return 0 for sleeping or 1 when running a job.
*/
public int getCurrentStatus() {
return this.statusCode;
}
/**
* Gets the total time the current job has been running or 0 when sleeping.
* @return Total time the curent job has been running.
*/
public long getCurrentRuntime() {
if (this.jobStart > 0) {
long now = System.currentTimeMillis();
return now - this.jobStart;
} else {
return 0;
}
}
public Long getThreadId() {
if (this.thread != null) {
return this.thread.getId();
} else {
return null;
}
}
/**
* Get the current running job's ID.
* @return String ID of the current running job.
*/
public String getJobId() {
if (this.statusCode == 1) {
if (this.currentJob != null) {
return this.currentJob.getJobId();
} else {
return "WARNING: Invalid Job!";
}
} else {
return null;
}
}
/**
* Get the current running job's name.
* @return String name of the current running job.
*/
public String getJobName() {
if (this.statusCode == 1) {
if (this.currentJob != null) {
return this.currentJob.getJobName();
} else {
return "WARNING: Invalid Job!";
}
} else {
return null;
}
}
/**
* Returns the name of the service being run.
* @return The name of the service being run.
*/
public String getServiceName() {
String serviceName = null;
if (this.statusCode == 1) {
if (this.currentJob != null) {
if (this.currentJob instanceof GenericServiceJob) {
GenericServiceJob gsj = (GenericServiceJob) this.currentJob;
try {
serviceName = gsj.getServiceName();
} catch (InvalidJobException e) {
Debug.logError(e, module);
}
}
}
}
return serviceName;
}
/**
* Kill this invoker thread.s
*/
public void kill() {
this.stop();
this.statusCode = -1;
this.thread.interrupt();
this.thread = null;
}
public synchronized void run() {
while (run) {
Job job = jp.next();
if (job == null) {
try {
java.lang.Thread.sleep(wait);
} catch (InterruptedException ie) {
Debug.logError(ie, "JobInvoker.run() : InterruptedException", module);
stop();
}
} else {
Debug.logInfo("Invoker [" + thread.getName() + "] received job [" + job.getJobName() + "] from poller [" + jp.toString() + "]", module);
// setup the current job settings
this.currentJob = job;
this.statusCode = 1;
this.jobStart = System.currentTimeMillis();
// execute the job
if (Debug.verboseOn()) Debug.logVerbose("Invoker: " + thread.getName() + " executing job -- " + job.getJobName(), module);
try {
job.exec();
} catch (InvalidJobException e) {
Debug.logWarning(e.getMessage(), module);
}
if (Debug.verboseOn()) Debug.logVerbose("Invoker: " + thread.getName() + " finished executing job -- " + job.getJobName(), module);
// clear the current job settings
this.currentJob = null;
this.statusCode = 0;
this.jobStart = 0;
// sanity check; make sure we don't have any transactions in place
try {
// roll back current TX first
if (TransactionUtil.isTransactionInPlace()) {
Debug.logWarning("*** NOTICE: JobInvoker finished w/ a transaction in place! Rolling back.", module);
TransactionUtil.rollback();
}
// now resume/rollback any suspended txs
if (TransactionUtil.suspendedTransactionsHeld()) {
int suspended = TransactionUtil.cleanSuspendedTransactions();
Debug.logWarning("Resumed/Rolled Back [" + suspended + "] transactions.", module);
}
} catch (GenericTransactionException e) {
Debug.logWarning(e, module);
}
// increment the count
count++;
if (Debug.verboseOn()) Debug.logVerbose("Invoker: " + thread.getName() + " (" + count + ") total.", module);
// reset thread local security
AbstractAuthorization.clearThreadLocal();
}
long diff = (new Date().getTime() - this.getTime());
if (getTTL() > 0 && diff > getTTL())
jp.removeThread(this);
}
if (Debug.verboseOn()) Debug.logVerbose("Invoker: " + thread.getName() + " dead -- " + UtilDateTime.nowTimestamp(), module);
}
private long getTTL() {
long ttl = THREAD_TTL;
try {
ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl"));
} catch (NumberFormatException nfe) {
Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module);
}
return ttl;
}
}