blob: 43e93a945de38c9e71c016bec92a2ac5ca02d36f [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.service.job;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.ofbiz.base.start.Start;
import org.ofbiz.base.util.Assert;
import org.ofbiz.base.util.Debug;
import org.ofbiz.service.config.ServiceConfigUtil;
/**
* Job poller. Queues and runs jobs.
*/
public final class JobPoller {
public static final String module = JobPoller.class.getName();
private static final AtomicInteger created = new AtomicInteger();
private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down.
private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down.
private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
private static final int QUEUE_SIZE = 100;
private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
private static final ConcurrentHashMap<String, JobManager> jobManagers = new ConcurrentHashMap<String, JobManager>();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(),
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
private static final JobPoller instance = new JobPoller();
/**
* Returns the <code>JobPoller</code> instance.
*/
public static JobPoller getInstance() {
return instance;
}
private static long getTTL() {
String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
if (!threadTTLAttr.isEmpty()) {
try {
int threadTTL = Integer.parseInt(threadTTLAttr);
if (threadTTL > 0) {
return threadTTL;
}
} catch (NumberFormatException e) {
Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module);
}
}
return THREAD_TTL;
}
private static int maxThreads() {
String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
if (!maxThreadsAttr.isEmpty()) {
try {
int maxThreads = Integer.parseInt(maxThreadsAttr);
if (maxThreads > 0) {
return maxThreads;
}
} catch (NumberFormatException e) {
Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
}
}
return MAX_THREADS;
}
private static int minThreads() {
String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
if (!minThreadsAttr.isEmpty()) {
try {
int minThreads = Integer.parseInt(minThreadsAttr);
if (minThreads > 0) {
return minThreads;
}
} catch (NumberFormatException e) {
Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
}
}
return MIN_THREADS;
}
private static int pollWaitTime() {
String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
if (!pollIntervalAttr.isEmpty()) {
try {
int pollInterval = Integer.parseInt(pollIntervalAttr);
if (pollInterval > 0) {
return pollInterval;
}
} catch (NumberFormatException e) {
Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module);
}
}
return POLL_WAIT;
}
private static int queueSize() {
String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
if (!queueSizeAttr.isEmpty()) {
try {
int queueSize = Integer.parseInt(queueSizeAttr);
if (queueSize > 0) {
return queueSize;
}
} catch (NumberFormatException e) {
Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module);
}
}
return QUEUE_SIZE;
}
/**
* Register a {@link JobManager} with the job poller.
*
* @param jm The <code>JobManager</code> to register.
* @throws IllegalArgumentException if <code>jm</code> is null
*/
public static void registerJobManager(JobManager jm) {
Assert.notNull("jm", jm);
jobManagers.putIfAbsent(jm.getDelegator().getDelegatorName(), jm);
}
// -------------------------------------- //
private final Thread jobManagerPollerThread;
private JobPoller() {
if (pollEnabled()) {
jobManagerPollerThread = new Thread(new JobManagerPoller(), "OFBiz-JobPoller");
jobManagerPollerThread.setDaemon(false);
jobManagerPollerThread.start();
} else {
jobManagerPollerThread = null;
}
}
/**
* Returns a <code>Map</code> containing <code>JobPoller</code> statistics.
*/
public Map<String, Object> getPoolState() {
Map<String, Object> poolState = new HashMap<String, Object>();
poolState.put("keepAliveTimeInSeconds", executor.getKeepAliveTime(TimeUnit.SECONDS));
poolState.put("numberOfCoreInvokerThreads", executor.getCorePoolSize());
poolState.put("currentNumberOfInvokerThreads", executor.getPoolSize());
poolState.put("numberOfActiveInvokerThreads", executor.getActiveCount());
poolState.put("maxNumberOfInvokerThreads", executor.getMaximumPoolSize());
poolState.put("greatestNumberOfInvokerThreads", executor.getLargestPoolSize());
poolState.put("numberOfCompletedTasks", executor.getCompletedTaskCount());
BlockingQueue<Runnable> queue = executor.getQueue();
List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
Map<String, Object> taskInfo = null;
for (Runnable task : queue) {
Job job = (Job) task;
taskInfo = new HashMap<String, Object>();
taskInfo.put("id", job.getJobId());
taskInfo.put("name", job.getJobName());
String serviceName = "";
if (job instanceof GenericServiceJob) {
serviceName = ((GenericServiceJob) job).getServiceName();
}
taskInfo.put("serviceName", serviceName);
taskInfo.put("time", job.getStartTime());
taskInfo.put("runtime", job.getRuntime());
taskList.add(taskInfo);
}
poolState.put("taskList", taskList);
return poolState;
}
private boolean pollEnabled() {
String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
return !"false".equalsIgnoreCase(enabled);
}
/**
* Adds a job to the job queue.
* @throws InvalidJobException if the job is in an invalid state.
* @throws RejectedExecutionException if the poller is stopped.
*/
public void queueNow(Job job) throws InvalidJobException {
job.queue();
try {
executor.execute(job);
} catch (Exception e) {
job.deQueue();
}
}
/**
* Stops the <code>JobPoller</code>. This method is called when OFBiz shuts down.
* The <code>JobPoller</code> cannot be restarted.
*/
public void stop() {
Debug.logInfo("Shutting down JobPoller.", module);
if (jobManagerPollerThread != null) {
jobManagerPollerThread.interrupt();
}
List<Runnable> queuedJobs = executor.shutdownNow();
for (Runnable task : queuedJobs) {
try {
Job queuedJob = (Job) task;
queuedJob.deQueue();
} catch (Exception e) {
Debug.logWarning(e, module);
}
}
Debug.logInfo("JobPoller shutdown completed.", module);
}
private static class JobInvokerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
}
}
// Polls all registered JobManagers for jobs to queue.
private class JobManagerPoller implements Runnable {
// Do not check for interrupts in this method. The design requires the
// thread to complete the job manager poll uninterrupted.
public void run() {
Debug.logInfo("JobPoller thread started.", module);
try {
while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) {
Thread.sleep(1000);
}
while (!executor.isShutdown()) {
int remainingCapacity = executor.getQueue().remainingCapacity();
if (remainingCapacity > 0) {
// Build "list of lists"
Collection<JobManager> jmCollection = jobManagers.values();
List<Iterator<Job>> pollResults = new ArrayList<Iterator<Job>>();
for (JobManager jm : jmCollection) {
pollResults.add(jm.poll(remainingCapacity).iterator());
}
// Create queue candidate list from "list of lists"
List<Job> queueCandidates = new ArrayList<Job>();
boolean addingJobs = true;
while (addingJobs) {
addingJobs = false;
for (Iterator<Job> jobIterator : pollResults) {
if (jobIterator.hasNext()) {
queueCandidates.add(jobIterator.next());
addingJobs = true;
}
}
}
// The candidate list might be larger than the queue remaining capacity,
// but that is okay - the excess jobs will be dequeued and rescheduled.
for (Job job : queueCandidates) {
try {
queueNow(job);
} catch (InvalidJobException e) {
Debug.logError(e, module);
}
}
}
Thread.sleep(pollWaitTime());
}
} catch (InterruptedException e) {
// Happens when JobPoller shuts down - nothing to do.
Thread.currentThread().interrupt();
}
Debug.logInfo("JobPoller thread stopped.", module);
}
}
}