blob: 56d54e6b6a27daceadd962abd0f5bf0822ff4142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sling.commons.scheduler.impl;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.sling.commons.scheduler.Job;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.quartz.CronExpression;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
/**
* The quartz based implementation of the scheduler.
*
*/
@Component(
service = QuartzScheduler.class,
property = {
Constants.SERVICE_VENDOR + "=The Apache Software Foundation"
},
immediate = true
)
@Designate(
ocd = QuartzSchedulerConfiguration.class
)
public class QuartzScheduler implements BundleListener {
/** Map key for the job object */
static final String DATA_MAP_OBJECT = "QuartzJobScheduler.Object";
/** Map key for the provided job name */
static final String DATA_MAP_PROVIDED_NAME = "QuartzJobScheduler.ProvidedJobName";
/** Map key for the job name */
static final String DATA_MAP_NAME = "QuartzJobScheduler.JobName";
/** Map key for the configuration. */
static final String DATA_MAP_CONFIGURATION = "QuartzJobScheduler.Configuration";
/** Map key for the logger. */
static final String DATA_MAP_LOGGER = "QuartzJobScheduler.Logger";
/** Map key for the runOn information (String[]). */
static final String DATA_MAP_RUN_ON = "QuartzJobScheduler.runOn";
/** Map key for the bundle information (Long). */
static final String DATA_MAP_BUNDLE_ID = "QuartzJobScheduler.bundleId";
/** Map key for the bundle information (Long). */
static final String DATA_MAP_SERVICE_ID = "QuartzJobScheduler.serviceId";
/** Map key for the quartz scheduler */
static final String DATA_MAP_QUARTZ_SCHEDULER = "QuartzJobScheduler.QuartzScheduler";
static final String DATA_MAP_THREAD_POOL_NAME = "QuartzJobScheduler.threadPoolName";
static final String METRICS_NAME_RUNNING_JOBS = "commons.scheduler.running.jobs";
static final String METRICS_NAME_TIMER = "commons.scheduler.timer";
static final String METRICS_NAME_OLDEST_RUNNING_JOB_MILLIS = "commons.scheduler.oldest.running.job.millis";
// property which contains the component name ( = class names) for service references
static final String COMPONENT_NAME = "component.name";
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Logger defaultThreadPoolLogger = LoggerFactory.getLogger(this.getClass().getName() + ".defaultThreadPool");
private static final int STACKTRACE_DEPTH = 10;
@Reference
private ThreadPoolManager threadPoolManager;
@Reference(target = "(name=sling)")
MetricRegistry metricsRegistry;
ConfigHolder configHolder;
/** The quartz schedulers. */
private final Map<String, SchedulerProxy> schedulers = new HashMap<>();
private volatile String defaultPoolName;
private volatile String[] allowedPoolNames;
private volatile boolean active;
/**
* Activate this component.
* Start the scheduler.
* @throws Exception
*/
@Activate
protected void activate(final BundleContext ctx, final QuartzSchedulerConfiguration configuration) {
// SLING-2261 Prevent Quartz from checking for updates
System.setProperty("org.terracotta.quartz.skipUpdateCheck", Boolean.TRUE.toString());
QuartzJobExecutor.FORCE_LEADER.set(configuration.scheduler_useleaderforsingle());
defaultPoolName = configuration.poolName();
allowedPoolNames = configuration.allowedPoolNames();
if (allowedPoolNames == null) {
allowedPoolNames = new String[0];
}
ctx.addBundleListener(this);
this.configHolder = new ConfigHolder(configuration);
this.active = true;
}
/**
* Deactivate this component.
* Stop the scheduler.
*/
@Deactivate
protected void deactivate(final BundleContext ctx) {
ctx.removeBundleListener(this);
final Map<String, SchedulerProxy> proxies;
synchronized ( this.schedulers ) {
this.active = false;
proxies = new HashMap<>(this.schedulers);
this.schedulers.clear();
}
for(final SchedulerProxy proxy : proxies.values()) {
proxy.dispose();
}
}
/**
* Get the thread pool name to use based on the provided/configured name
* @param name The configured name
* @return The name to use
*/
private String getThreadPoolName(final String name) {
// no name specified
if ( name == null || name.trim().isEmpty() ) {
return this.defaultPoolName;
}
// checked allowed list
for(final String n : this.allowedPoolNames) {
if ( name.trim().equals(n) ) {
return n;
}
}
logger.warn("Scheduler job requested thread pool with name " + name + " but this thread pool is not in the list of allowed pools.");
return this.defaultPoolName;
}
/**
* @see org.osgi.framework.BundleListener#bundleChanged(org.osgi.framework.BundleEvent)
*/
@Override
public void bundleChanged(final BundleEvent event) {
if ( event.getType() == BundleEvent.STOPPED ) {
final Long bundleId = event.getBundle().getBundleId();
synchronized ( this.schedulers ) {
if ( this.active ) {
for(final SchedulerProxy proxy : this.schedulers.values()) {
try {
final List<String> groups = proxy.getScheduler().getJobGroupNames();
for(final String group : groups) {
final Set<JobKey> keys = proxy.getScheduler().getJobKeys(GroupMatcher.jobGroupEquals(group));
for(final JobKey key : keys) {
final JobDetail detail = proxy.getScheduler().getJobDetail(key);
if ( detail != null ) {
final String jobName = (String) detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_NAME);
final Object job = detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_OBJECT);
if ( jobName != null && job != null ) {
final Long jobBundleId = (Long) detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_BUNDLE_ID);
if ( jobBundleId != null && jobBundleId.equals(bundleId) ) {
proxy.getScheduler().deleteJob(key);
this.logger.debug("Unscheduling job with name {}", jobName);
}
}
}
}
}
} catch ( final SchedulerException ignore) {
// we ignore this as there is nothing to do
}
}
}
}
}
}
/**
* Initialize the data map for the job executor.
* @param jobName
* @param job
* @param config
* @param concurent
* @return
*/
private JobDataMap initDataMap(final Long bundleId,
final Long serviceId,
final String jobName,
final Object job,
final InternalScheduleOptions options) {
final JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(DATA_MAP_OBJECT, job);
if ( options.providedName != null ) {
jobDataMap.put(DATA_MAP_PROVIDED_NAME, options.providedName);
}
jobDataMap.put(DATA_MAP_NAME, jobName);
jobDataMap.put(DATA_MAP_LOGGER, this.logger);
jobDataMap.put(DATA_MAP_QUARTZ_SCHEDULER, this);
jobDataMap.put(DATA_MAP_THREAD_POOL_NAME, getThreadPoolName(options.threadPoolName));
if ( bundleId != null ) {
jobDataMap.put(DATA_MAP_BUNDLE_ID, bundleId);
}
if ( serviceId != null ) {
jobDataMap.put(DATA_MAP_SERVICE_ID, serviceId);
}
if ( options.configuration != null ) {
jobDataMap.put(DATA_MAP_CONFIGURATION, options.configuration);
}
if ( options.runOn != null) {
if ( options.runOn.length > 1
|| (!Scheduler.VALUE_RUN_ON_LEADER.equals(options.runOn[0]) && !Scheduler.VALUE_RUN_ON_SINGLE.equals(options.runOn[0]))) {
logger.warn("Job {} ({}) is scheduled to run on specific Sling Instances. This feature is deprecated. Please don't use it anymore.", jobName, job);
}
jobDataMap.put(DATA_MAP_RUN_ON, options.runOn);
}
return jobDataMap;
}
/**
* Create the job detail.
* @param name
* @param jobDataMap
* @return
*/
private JobDetail createJobDetail(final String name,
final JobDataMap jobDataMap,
final boolean concurrent) {
final JobDetail detail = JobBuilder.newJob((concurrent ? QuartzJobExecutor.class : NonParallelQuartzJobExecutor.class))
.withIdentity(name)
.usingJobData(jobDataMap)
.build();
return detail;
}
/**
* Check the job object, either runnable or job is allowed
*/
private void checkJob(final Object job)
throws IllegalArgumentException {
if (!(job instanceof Runnable) && !(job instanceof Job)) {
throw new IllegalArgumentException("Job object is neither an instance of " + Runnable.class.getName() + " nor " + Job.class.getName());
}
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#addJob(java.lang.String, java.lang.Object, java.util.Map, java.lang.String, boolean)
*/
public void addJob(final Long bundleId,
final Long serviceId,
final String name,
final Object job,
final Map<String, Serializable> config,
final String schedulingExpression,
final boolean canRunConcurrently)
throws SchedulerException {
this.scheduleJob(bundleId, serviceId, job,
EXPR(schedulingExpression).name(name).config(config).canRunConcurrently(canRunConcurrently));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#addPeriodicJob(java.lang.String, java.lang.Object, java.util.Map, long, boolean)
*/
public void addPeriodicJob(final Long bundleId,
final Long serviceId,
final String name,
final Object job,
final Map<String, Serializable> config,
final long period,
final boolean canRunConcurrently)
throws SchedulerException {
this.addPeriodicJob(bundleId, serviceId, name, job, config, period, canRunConcurrently, false);
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#addPeriodicJob(java.lang.String, java.lang.Object, java.util.Map, long, boolean, boolean)
*/
public void addPeriodicJob(final Long bundleId,
final Long serviceId,
final String name,
final Object job,
final Map<String, Serializable> config,
final long period,
final boolean canRunConcurrently,
final boolean startImmediate)
throws SchedulerException {
this.scheduleJob(bundleId, serviceId, job,
PERIODIC(period, startImmediate).name(name).config(config).canRunConcurrently(canRunConcurrently));
}
private ScheduleOptions PERIODIC(final long period, final boolean startImmediate) {
if ( period < 1 ) {
return new InternalScheduleOptions(new IllegalArgumentException("Period argument must be higher than 0"));
}
final long ms = period * 1000;
final TriggerBuilder<SimpleTrigger> builder = TriggerBuilder.newTrigger()
.startAt(new Date(System.currentTimeMillis() + ms))
.withSchedule(SimpleScheduleBuilder.simpleSchedule().repeatForever().withIntervalInMilliseconds(ms));
if ( startImmediate ) {
return new InternalScheduleOptions( builder.startNow());
} else {
return new InternalScheduleOptions( builder.startAt(new Date(System.currentTimeMillis() + ms)) );
}
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#fireJob(java.lang.Object, java.util.Map)
*/
public void fireJob(final Long bundleId,
final Long serviceId,
final Object job, final Map<String, Serializable> config)
throws SchedulerException {
this.scheduleJob(bundleId, serviceId, job,
NOW().config(config));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#fireJobAt(java.lang.String, java.lang.Object, java.util.Map, java.util.Date)
*/
public void fireJobAt(final Long bundleId,
final Long serviceId,
final String name, final Object job, final Map<String, Serializable> config, final Date date)
throws SchedulerException {
this.scheduleJob(bundleId, serviceId, job,
AT(date).name(name).config(config));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#fireJob(java.lang.Object, java.util.Map, int, long)
*/
public boolean fireJob(final Long bundleId,
final Long serviceId,
final Object job,
final Map<String, Serializable> config,
final int times,
final long period) {
return this.schedule(bundleId, serviceId, job,
NOW(times, period).config(config));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#fireJobAt(java.lang.String, java.lang.Object, java.util.Map, java.util.Date, int, long)
*/
public boolean fireJobAt(final Long bundleId,
final Long serviceId,
final String name,
final Object job,
final Map<String, Serializable> config,
final Date date,
final int times,
final long period) {
return this.schedule(bundleId, serviceId, job,
AT(date, times, period).name(name).config(config));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#removeJob(java.lang.String)
*/
public void removeJob(final Long bundleId, final String jobName) throws NoSuchElementException {
// as this method might be called from unbind and during
// unbind a deactivate could happen, we check the scheduler first
synchronized ( this.schedulers ) {
if ( this.active ) {
for(final SchedulerProxy proxy : this.schedulers.values()) {
try {
final JobKey key = JobKey.jobKey(jobName);
final JobDetail jobdetail = proxy.getScheduler().getJobDetail(key);
if (jobdetail != null) {
proxy.getScheduler().deleteJob(key);
this.logger.debug("Unscheduling job with name {}", jobName);
return;
}
} catch (final SchedulerException ignored) {
// ignore
}
}
}
}
if ( this.active ) {
throw new NoSuchElementException("No job found with name " + jobName);
}
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#NOW()
*/
public ScheduleOptions NOW() {
return new InternalScheduleOptions( TriggerBuilder.newTrigger()
.startNow());
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#NOW(int, long)
*/
public ScheduleOptions NOW(final int times, final long period) {
if ( times < 2 && times != -1 ) {
return new InternalScheduleOptions(new IllegalArgumentException("Times argument must be higher than 1 or -1"));
}
if ( period < 1 ) {
return new InternalScheduleOptions(new IllegalArgumentException("Period argument must be higher than 0"));
}
final SimpleScheduleBuilder sb;
if ( times == -1 ) {
sb = SimpleScheduleBuilder.simpleSchedule().repeatForever();
} else {
sb = SimpleScheduleBuilder.simpleSchedule().withRepeatCount(times - 1);
}
return new InternalScheduleOptions( TriggerBuilder.newTrigger()
.startNow()
.withSchedule(sb.withIntervalInMilliseconds(period * 1000)));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#AT(java.util.Date)
*/
public ScheduleOptions AT(final Date date) {
if ( date == null ) {
return new InternalScheduleOptions(new IllegalArgumentException("Date can't be null"));
}
return new InternalScheduleOptions( TriggerBuilder.newTrigger()
.startAt(date));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#AT(java.util.Date, int, long)
*/
public ScheduleOptions AT(final Date date, final int times, final long period) {
if ( date == null ) {
return new InternalScheduleOptions(new IllegalArgumentException("Date can't be null"));
}
if ( times < 2 && times != -1 ) {
return new InternalScheduleOptions(new IllegalArgumentException("Times argument must be higher than 1 or -1"));
}
if ( period < 1 ) {
return new InternalScheduleOptions(new IllegalArgumentException("Period argument must be higher than 0"));
}
final SimpleScheduleBuilder sb;
if ( times == -1 ) {
sb = SimpleScheduleBuilder.simpleSchedule().repeatForever();
} else {
sb = SimpleScheduleBuilder.simpleSchedule().withRepeatCount(times - 1);
}
return new InternalScheduleOptions( TriggerBuilder.newTrigger()
.startAt(date)
.withSchedule(sb.withIntervalInMilliseconds(period * 1000)));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#EXPR(java.lang.String)
*/
public ScheduleOptions EXPR(final String expression) {
if ( expression == null ) {
return new InternalScheduleOptions(new IllegalArgumentException("Expression can't be null"));
}
if ( !CronExpression.isValidExpression(expression) ) {
return new InternalScheduleOptions(new IllegalArgumentException("Expressionis invalid : " + expression));
}
return new InternalScheduleOptions( TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(expression)));
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#schedule(java.lang.Object, org.apache.sling.commons.scheduler.ScheduleOptions)
*/
public boolean schedule(final Long bundleId, final Long serviceId, final Object job, final ScheduleOptions options) {
try {
this.scheduleJob(bundleId, serviceId, job, options);
return true;
} catch (final IllegalArgumentException iae) {
// ignore this and return false
return false;
} catch (final SchedulerException se) {
// ignore this and return false
return false;
}
}
/**
* @see org.apache.sling.commons.scheduler.Scheduler#unschedule(java.lang.String)
*/
public boolean unschedule(final Long bundleId, final String jobName) {
if ( jobName != null ) {
synchronized ( this.schedulers ) {
for(final SchedulerProxy proxy : this.schedulers.values()) {
try {
final JobKey key = JobKey.jobKey(jobName);
final JobDetail jobdetail = proxy.getScheduler().getJobDetail(key);
if (jobdetail != null) {
proxy.getScheduler().deleteJob(key);
this.logger.debug("Unscheduling job with name {}", jobName);
return true;
}
} catch (final SchedulerException ignored) {
// ignore
}
}
}
}
return false;
}
/**
* Internal method to schedule a job
* @throws SchedulerException if the job can't be scheduled
* @throws IllegalArgumentException If the preconditions are not met
*/
private void scheduleJob(final Long bundleId, final Long serviceId, final Object job, final ScheduleOptions options)
throws SchedulerException {
this.checkJob(job);
if ( !(options instanceof InternalScheduleOptions)) {
throw new IllegalArgumentException("Options has not been created via schedule or is null.");
}
final InternalScheduleOptions opts = (InternalScheduleOptions)options;
if ( opts.argumentException != null ) {
throw opts.argumentException;
}
synchronized ( this.schedulers ) {
// as this method might be called from unbind and during
// unbind a deactivate could happen, we check the scheduler first
final String poolName = getThreadPoolName(opts.threadPoolName);
if (poolName.equals(this.defaultPoolName) && defaultThreadPoolLogger.isDebugEnabled()) {
logUsageOfDefaultThreadPool(opts);
}
SchedulerProxy proxy = null;
synchronized ( this.schedulers ) {
if ( this.active ) {
proxy = this.schedulers.get(poolName);
if ( proxy == null ) {
proxy = new SchedulerProxy(this.threadPoolManager, poolName);
this.schedulers.put(poolName, proxy);
}
}
}
if ( proxy == null ) {
throw new IllegalStateException("Scheduler is not available anymore.");
}
final String name;
if ( opts.name != null ) {
// if there is already a job with the name, remove it first
this.unschedule(bundleId, opts.name);
name = opts.name;
} else {
name = job.getClass().getName() + ':' + UUID.randomUUID();
}
final Trigger trigger = opts.trigger.withIdentity(name).build();
// create the data map
final JobDataMap jobDataMap = this.initDataMap(bundleId, serviceId, name, job, opts);
final JobDetail detail = this.createJobDetail(name, jobDataMap, opts.canRunConcurrently);
this.logger.debug("Scheduling job {} with name {} and trigger {}", new Object[] {job, name, trigger});
proxy.getScheduler().scheduleJob(detail, trigger);
}
}
/**
* This is used by the web console plugin
* @return All current schedulers
*/
Map<String, SchedulerProxy> getSchedulers() {
synchronized ( this.schedulers ) {
return new HashMap<>(this.schedulers);
}
}
/**
* Log a stacktrace that the default threadpool is used
*/
private void logUsageOfDefaultThreadPool(InternalScheduleOptions options) {
String location = "";
if (!StringUtils.isEmpty(options.componenentName)) {
location = "defined by OSGI annotations on component with name " + options.componenentName;
} else {
try {
throw new IllegalArgumentException();
} catch (IllegalArgumentException e) {
String[] stackFrames = ExceptionUtils.getStackFrames(e);
StringBuilder stackExcerpt = new StringBuilder();
// skip the first line, it contains the name of the exception
int depth = Math.min(STACKTRACE_DEPTH, stackFrames.length);
for (int i=1;i< depth ;i++) {
stackExcerpt.append(stackFrames[i]).append("\n");
}
location = "triggered by this code:\n" + stackExcerpt.toString();
}
}
defaultThreadPoolLogger.debug("Scheduled job using the default threadpool; {}",
location);
}
}