blob: f84fe768e29b2776e17115396c3440c80056e7dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sling.commons.scheduler.impl;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.commons.scheduler.JobContext;
import org.apache.sling.commons.scheduler.Scheduler;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
/**
* This component is responsible to launch a {@link org.apache.sling.commons.scheduler.Job}
* or {@link Runnable} in a Quartz Scheduler.
*
*/
public class QuartzJobExecutor implements Job {
/** Is discovery available? */
public static final AtomicBoolean DISCOVERY_AVAILABLE = new AtomicBoolean(false);
/** Is stable discovery information available? */
public static final AtomicBoolean DISCOVERY_INFO_AVAILABLE = new AtomicBoolean(false);
/** The id of the current instance (if settings service is available. */
public static volatile String SLING_ID;
/** Is this instance the leader? */
public static final AtomicBoolean IS_LEADER = new AtomicBoolean(true);
/** The available Sling IDs */
public static final AtomicReference<String[]> SLING_IDS = new AtomicReference<>(null);
private boolean checkDiscoveryAvailable(final Logger logger,
final Object job,
final String name,
final String[] runOn) {
if ( DISCOVERY_AVAILABLE.get() ) {
if ( DISCOVERY_INFO_AVAILABLE.get() ) {
return true;
} else {
logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
new Object[] {job, name, runOn[0]});
return false;
}
} else {
logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
new Object[] {job, name, runOn[0]});
return false;
}
}
private String checkSlingId(final Logger logger,
final Object job,
final String name,
final String[] runOn) {
final String myId = SLING_ID;
if ( myId == null ) {
logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
new Object[] {job, name, Arrays.toString(runOn)});
return null;
}
return myId;
}
private boolean shouldRun(final Logger logger,
final Object job,
final String name,
final String[] runOn) {
if ( runOn != null ) {
if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) ) {
// leader
if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
return false;
}
if ( !IS_LEADER.get() ) {
logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
new Object[] {job, name, runOn[0]});
return false;
}
} else if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]) ) {
// single instance
if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
return false;
}
final String myId = checkSlingId(logger, job, name, runOn);
if ( myId == null ) {
return false;
}
final String[] ids = QuartzJobExecutor.SLING_IDS.get();
boolean schedule = false;
if ( ids != null ) {
int index = 0;
try {
final MessageDigest m = MessageDigest.getInstance("MD5");
m.reset();
m.update(job.getClass().getName().getBytes("UTF-8"));
index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
} catch ( final IOException | NoSuchAlgorithmException ex ) {
// although this should never happen (MD5 and UTF-8 are always available) we consider
// this an error case
logger.error("Unable to distribute scheduled job " + job + " with name " + name, ex);
return false;
}
schedule = myId.equals(ids[index]);
}
if ( !schedule ) {
logger.debug("Excluding job {} with name {} and config {} - distributed to different Sling instance",
new Object[] {job, name, runOn});
return false;
}
} else { // sling IDs
final String myId = checkSlingId(logger, job, name, runOn);
if ( myId == null ) {
return false;
} else {
boolean schedule = false;
for(final String id : runOn ) {
if ( myId.equals(id) ) {
schedule = true;
break;
}
}
if ( !schedule ) {
logger.debug("Excluding job {} with name {} and config {} - different Sling ID",
new Object[] {job, name, Arrays.toString(runOn)});
return false;
}
}
}
}
return true;
}
/**
* @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
*/
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
final JobDataMap data = context.getJobDetail().getJobDataMap();
final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
// check run on information
final String name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
if ( !shouldRun(logger, job, name, (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON)) ) {
return;
}
String origThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(origThreadName + "-" + name);
logger.debug("Executing job {} with name {}", job, data.get(QuartzScheduler.DATA_MAP_NAME));
if (job instanceof org.apache.sling.commons.scheduler.Job) {
@SuppressWarnings("unchecked")
final Map<String, Serializable> configuration = (Map<String, Serializable>) data.get(QuartzScheduler.DATA_MAP_CONFIGURATION);
final JobContext jobCtx = new JobContextImpl(name, configuration);
((org.apache.sling.commons.scheduler.Job) job).execute(jobCtx);
} else if (job instanceof Runnable) {
((Runnable) job).run();
} else {
logger.error("Scheduled job {} is neither a job nor a runnable.", job);
}
} catch (final Throwable t) {
// if this is a quartz exception, rethrow it
if (t instanceof JobExecutionException) {
throw (JobExecutionException) t;
}
// there is nothing we can do here, so we just log
logger.error("Exception during job execution of " + job + " : " + t.getMessage(), t);
} finally {
Thread.currentThread().setName(origThreadName);
}
}
public static final class JobContextImpl implements JobContext {
protected final Map<String, Serializable> configuration;
protected final String name;
public JobContextImpl(String name, Map<String, Serializable> config) {
this.name = name;
this.configuration = config;
}
/**
* @see org.apache.sling.commons.scheduler.JobContext#getConfiguration()
*/
@Override
public Map<String, Serializable> getConfiguration() {
return this.configuration;
}
/**
* @see org.apache.sling.commons.scheduler.JobContext#getName()
*/
@Override
public String getName() {
return this.name;
}
}
}