SLING-5387 : Provide support for running singleton jobs on non leader cluster nodes also
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1792848 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java b/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
index eed8be5..f84fe76 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
@@ -16,10 +16,15 @@
*/
package org.apache.sling.commons.scheduler.impl;
+import java.io.IOException;
import java.io.Serializable;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.commons.scheduler.JobContext;
import org.apache.sling.commons.scheduler.Scheduler;
@@ -48,43 +53,91 @@
/** Is this instance the leader? */
public static final AtomicBoolean IS_LEADER = new AtomicBoolean(true);
- /**
- * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
- */
- public void execute(final JobExecutionContext context) throws JobExecutionException {
+ /** The available Sling IDs */
+ public static final AtomicReference<String[]> SLING_IDS = new AtomicReference<>(null);
- final JobDataMap data = context.getJobDetail().getJobDataMap();
- final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
- final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
+ private boolean checkDiscoveryAvailable(final Logger logger,
+ final Object job,
+ final String name,
+ final String[] runOn) {
+ if ( DISCOVERY_AVAILABLE.get() ) {
+ if ( DISCOVERY_INFO_AVAILABLE.get() ) {
+ return true;
+ } else {
+ logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
+ new Object[] {job, name, runOn[0]});
+ return false;
+ }
+ } else {
+ logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
+ new Object[] {job, name, runOn[0]});
+ return false;
+ }
+ }
- // check run on information
- final String[] runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+ private String checkSlingId(final Logger logger,
+ final Object job,
+ final String name,
+ final String[] runOn) {
+ final String myId = SLING_ID;
+ if ( myId == null ) {
+ logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
+ new Object[] {job, name, Arrays.toString(runOn)});
+ return null;
+ }
+ return myId;
+ }
+
+ private boolean shouldRun(final Logger logger,
+ final Object job,
+ final String name,
+ final String[] runOn) {
if ( runOn != null ) {
- if ( runOn.length == 1 &&
- (Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) || Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0])) ) {
- if ( DISCOVERY_AVAILABLE.get() ) {
- if ( DISCOVERY_INFO_AVAILABLE.get() ) {
- if ( !IS_LEADER.get() ) {
- logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
- new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
- return;
- }
- } else {
- logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
- new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
- return;
+ if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) ) {
+ // leader
+ if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+ return false;
+ }
+ if ( !IS_LEADER.get() ) {
+ logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
+ new Object[] {job, name, runOn[0]});
+ return false;
+ }
+ } else if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]) ) {
+ // single instance
+ if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+ return false;
+ }
+ final String myId = checkSlingId(logger, job, name, runOn);
+ if ( myId == null ) {
+ return false;
+ }
+ final String[] ids = QuartzJobExecutor.SLING_IDS.get();
+ boolean schedule = false;
+ if ( ids != null ) {
+ int index = 0;
+ try {
+ final MessageDigest m = MessageDigest.getInstance("MD5");
+ m.reset();
+ m.update(job.getClass().getName().getBytes("UTF-8"));
+ index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
+ } catch ( final IOException | NoSuchAlgorithmException ex ) {
+ // although this should never happen (MD5 and UTF-8 are always available) we consider
+ // this an error case
+ logger.error("Unable to distribute scheduled job " + job + " with name " + name, ex);
+ return false;
}
- } else {
- logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
- new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
- return;
+ schedule = myId.equals(ids[index]);
+ }
+ if ( !schedule ) {
+ logger.debug("Excluding job {} with name {} and config {} - distributed to different Sling instance",
+ new Object[] {job, name, runOn});
+ return false;
}
} else { // sling IDs
- final String myId = SLING_ID;
+ final String myId = checkSlingId(logger, job, name, runOn);
if ( myId == null ) {
- logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
- new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), Arrays.toString(runOn)});
- return;
+ return false;
} else {
boolean schedule = false;
for(final String id : runOn ) {
@@ -95,14 +148,31 @@
}
if ( !schedule ) {
logger.debug("Excluding job {} with name {} and config {} - different Sling ID",
- new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), Arrays.toString(runOn)});
- return;
+ new Object[] {job, name, Arrays.toString(runOn)});
+ return false;
}
}
}
}
+ return true;
+ }
+ /**
+ * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+ */
+ @Override
+ public void execute(final JobExecutionContext context) throws JobExecutionException {
+
+ final JobDataMap data = context.getJobDetail().getJobDataMap();
+ final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+ final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
+
+ // check run on information
final String name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
+ if ( !shouldRun(logger, job, name, (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON)) ) {
+ return;
+ }
+
String origThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(origThreadName + "-" + name);
@@ -144,6 +214,7 @@
/**
* @see org.apache.sling.commons.scheduler.JobContext#getConfiguration()
*/
+ @Override
public Map<String, Serializable> getConfiguration() {
return this.configuration;
}
@@ -151,6 +222,7 @@
/**
* @see org.apache.sling.commons.scheduler.JobContext#getName()
*/
+ @Override
public String getName() {
return this.name;
}
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java b/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
index 90dddca..fce6bd1 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
@@ -16,6 +16,11 @@
*/
package org.apache.sling.commons.scheduler.impl;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
@@ -52,11 +57,18 @@
@Override
public void handleTopologyEvent(final TopologyEvent event) {
if ( event.getType() == Type.TOPOLOGY_INIT || event.getType() == Type.TOPOLOGY_CHANGED ) {
+ final List<String> ids = new ArrayList<>();
+ for(final InstanceDescription desc : event.getNewView().getInstances()) {
+ ids.add(desc.getSlingId());
+ }
+ Collections.sort(ids);
QuartzJobExecutor.IS_LEADER.set(event.getNewView().getLocalInstance().isLeader());
QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.set(true);
+ QuartzJobExecutor.SLING_IDS.set(ids.toArray(new String[ids.size()]));
} else if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
QuartzJobExecutor.IS_LEADER.set(false);
QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.set(false);
+ QuartzJobExecutor.SLING_IDS.set(null);
}
}
}
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java b/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
index 18d80aa..a61fc71 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.sling.commons.scheduler.impl;
+import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -122,8 +123,12 @@
private String getServiceIdentifier(final ServiceReference<?> ref) {
String name = getStringProperty(ref, Scheduler.PROPERTY_SCHEDULER_NAME);
if ( name == null ) {
- name = getStringProperty(ref, Constants.SERVICE_PID);
- if ( name == null ) {
+ final Object pid = ref.getProperty(Constants.SERVICE_PID);
+ if ( pid instanceof String ) {
+ name = (String)pid;
+ } else if ( pid instanceof String[] ) {
+ name = Arrays.toString((String[])pid);
+ } else {
name = "Registered Service";
}
}