blob: f009b4ee88c9052154a374301df09cf38ed7e001 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.apache.ofbiz.service.job;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ofbiz.base.config.GenericConfigException;
import org.apache.ofbiz.base.start.Start;
import org.apache.ofbiz.base.util.Assert;
import org.apache.ofbiz.base.util.Debug;
import org.apache.ofbiz.service.config.ServiceConfigUtil;
import org.apache.ofbiz.service.config.ServiceConfigListener;
import org.apache.ofbiz.service.config.model.ServiceConfig;
import org.apache.ofbiz.service.config.model.ThreadPool;
/**
* Job poller. Queues and runs jobs.
*/
public final class JobPoller implements ServiceConfigListener {
public static final String module = JobPoller.class.getName();
private static final AtomicInteger created = new AtomicInteger();
private static final ConcurrentHashMap<String, JobManager> jobManagers = new ConcurrentHashMap<String, JobManager>();
private static final ThreadPoolExecutor executor = createThreadPoolExecutor();
private static final JobPoller instance = new JobPoller();
/**
* Returns the <code>JobPoller</code> instance.
*/
public static JobPoller getInstance() {
return instance;
}
private static ThreadPoolExecutor createThreadPoolExecutor() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
return new ThreadPoolExecutor(threadPool.getMinThreads(), threadPool.getMaxThreads(), threadPool.getTtl(),
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPool.getJobs()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
} catch (GenericConfigException e) {
Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
return new ThreadPoolExecutor(ThreadPool.MIN_THREADS, ThreadPool.MAX_THREADS, ThreadPool.THREAD_TTL,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(ThreadPool.QUEUE_SIZE), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}
}
private static int pollWaitTime() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
return threadPool.getPollDbMillis();
} catch (GenericConfigException e) {
Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
return ThreadPool.POLL_WAIT;
}
}
/**
* Register a {@link JobManager} with the job poller.
*
* @param jm The <code>JobManager</code> to register.
* @throws IllegalArgumentException if <code>jm</code> is null
*/
public static void registerJobManager(JobManager jm) {
Assert.notNull("jm", jm);
jobManagers.putIfAbsent(jm.getDelegator().getDelegatorName(), jm);
}
// -------------------------------------- //
private final Thread jobManagerPollerThread;
private JobPoller() {
if (pollEnabled()) {
jobManagerPollerThread = new Thread(new JobManagerPoller(), "OFBiz-JobPoller");
jobManagerPollerThread.setDaemon(false);
jobManagerPollerThread.start();
} else {
jobManagerPollerThread = null;
}
ServiceConfigUtil.registerServiceConfigListener(this);
}
/**
* Returns a <code>Map</code> containing <code>JobPoller</code> statistics.
*/
public Map<String, Object> getPoolState() {
Map<String, Object> poolState = new HashMap<String, Object>();
poolState.put("keepAliveTimeInSeconds", executor.getKeepAliveTime(TimeUnit.SECONDS));
poolState.put("numberOfCoreInvokerThreads", executor.getCorePoolSize());
poolState.put("currentNumberOfInvokerThreads", executor.getPoolSize());
poolState.put("numberOfActiveInvokerThreads", executor.getActiveCount());
poolState.put("maxNumberOfInvokerThreads", executor.getMaximumPoolSize());
poolState.put("greatestNumberOfInvokerThreads", executor.getLargestPoolSize());
poolState.put("numberOfCompletedTasks", executor.getCompletedTaskCount());
BlockingQueue<Runnable> queue = executor.getQueue();
List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
Map<String, Object> taskInfo = null;
for (Runnable task : queue) {
Job job = (Job) task;
taskInfo = new HashMap<String, Object>();
taskInfo.put("id", job.getJobId());
taskInfo.put("name", job.getJobName());
String serviceName = "";
if (job instanceof GenericServiceJob) {
serviceName = ((GenericServiceJob) job).getServiceName();
}
taskInfo.put("serviceName", serviceName);
taskInfo.put("time", job.getStartTime());
taskInfo.put("runtime", job.getRuntime());
taskList.add(taskInfo);
}
poolState.put("taskList", taskList);
return poolState;
}
@Override
public void onServiceConfigChange(ServiceConfig serviceConfig) {
if (!executor.isShutdown()) {
ThreadPool threadPool = serviceConfig.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
executor.setCorePoolSize(threadPool.getMinThreads());
executor.setMaximumPoolSize(threadPool.getMaxThreads());
executor.setKeepAliveTime(threadPool.getTtl(), TimeUnit.MILLISECONDS);
}
}
private boolean pollEnabled() {
try {
return ServiceConfigUtil.getServiceEngine().getThreadPool().getPollEnabled();
} catch (GenericConfigException e) {
Debug.logWarning(e, "Exception thrown while getting configuration: ", module);
return false;
}
}
/**
* Adds a job to the job queue.
* @throws InvalidJobException if the job is in an invalid state.
* @throws RejectedExecutionException if the poller is stopped.
*/
public void queueNow(Job job) throws InvalidJobException {
job.queue();
try {
executor.execute(job);
} catch (Exception e) {
job.deQueue();
}
}
/**
* Stops the <code>JobPoller</code>. This method is called when OFBiz shuts down.
* The <code>JobPoller</code> cannot be restarted.
*/
public void stop() {
Debug.logInfo("Shutting down JobPoller.", module);
if (jobManagerPollerThread != null) {
jobManagerPollerThread.interrupt();
}
List<Runnable> queuedJobs = executor.shutdownNow();
for (Runnable task : queuedJobs) {
try {
Job queuedJob = (Job) task;
queuedJob.deQueue();
} catch (Exception e) {
Debug.logWarning(e, module);
}
}
Debug.logInfo("JobPoller shutdown completed.", module);
}
private static class JobInvokerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
}
}
// Polls all registered JobManagers for jobs to queue.
private class JobManagerPoller implements Runnable {
// Do not check for interrupts in this method. The design requires the
// thread to complete the job manager poll uninterrupted.
public void run() {
Debug.logInfo("JobPoller thread started.", module);
try {
while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) {
Thread.sleep(1000);
}
while (!executor.isShutdown()) {
int remainingCapacity = executor.getQueue().remainingCapacity();
if (remainingCapacity > 0) {
// Build "list of lists"
Collection<JobManager> jmCollection = jobManagers.values();
List<Iterator<Job>> pollResults = new ArrayList<Iterator<Job>>();
for (JobManager jm : jmCollection) {
if (!jm.isAvailable()) {
if (Debug.infoOn()) Debug.logInfo("The job manager is locked.", module);
continue;
}
jm.reloadCrashedJobs();
pollResults.add(jm.poll(remainingCapacity).iterator());
}
// Create queue candidate list from "list of lists"
List<Job> queueCandidates = new ArrayList<Job>();
boolean addingJobs = true;
while (addingJobs) {
addingJobs = false;
for (Iterator<Job> jobIterator : pollResults) {
if (jobIterator.hasNext()) {
queueCandidates.add(jobIterator.next());
addingJobs = true;
}
}
}
// The candidate list might be larger than the queue remaining capacity,
// but that is okay - the excess jobs will be dequeued and rescheduled.
for (Job job : queueCandidates) {
try {
queueNow(job);
} catch (InvalidJobException e) {
Debug.logError(e, module);
}
}
}
Thread.sleep(pollWaitTime());
}
} catch (InterruptedException e) {
// Happens when JobPoller shuts down - nothing to do.
Thread.currentThread().interrupt();
}
Debug.logInfo("JobPoller thread stopped.", module);
}
}
}