SLING-5387 : Provide support for running singleton jobs on non leader cluster nodes also

git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1792848 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java b/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
index eed8be5..f84fe76 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
@@ -16,10 +16,15 @@
  */
 package org.apache.sling.commons.scheduler.impl;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sling.commons.scheduler.JobContext;
 import org.apache.sling.commons.scheduler.Scheduler;
@@ -48,43 +53,91 @@
     /** Is this instance the leader? */
     public static final AtomicBoolean IS_LEADER = new AtomicBoolean(true);
 
-    /**
-     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
-     */
-    public void execute(final JobExecutionContext context) throws JobExecutionException {
+    /** The available Sling IDs */
+    public static final AtomicReference<String[]> SLING_IDS = new AtomicReference<>(null);
 
-        final JobDataMap data = context.getJobDetail().getJobDataMap();
-        final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
-        final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
+    private boolean checkDiscoveryAvailable(final Logger logger,
+            final Object job,
+            final String name,
+            final String[] runOn) {
+        if ( DISCOVERY_AVAILABLE.get() ) {
+            if ( DISCOVERY_INFO_AVAILABLE.get() ) {
+                return true;
+            } else {
+                logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
+                        new Object[] {job, name, runOn[0]});
+                return false;
+            }
+        } else {
+            logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
+                    new Object[] {job, name, runOn[0]});
+            return false;
+        }
+    }
 
-        // check run on information
-        final String[] runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+    private String checkSlingId(final Logger logger,
+            final Object job,
+            final String name,
+            final String[] runOn) {
+        final String myId = SLING_ID;
+        if ( myId == null ) {
+            logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
+                    new Object[] {job, name, Arrays.toString(runOn)});
+            return null;
+        }
+        return myId;
+    }
+
+    private boolean shouldRun(final Logger logger,
+            final Object job,
+            final String name,
+            final String[] runOn) {
         if ( runOn != null ) {
-            if ( runOn.length == 1 &&
-                 (Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) || Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0])) ) {
-                if ( DISCOVERY_AVAILABLE.get() ) {
-                    if ( DISCOVERY_INFO_AVAILABLE.get() ) {
-                        if ( !IS_LEADER.get() ) {
-                            logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
-                                    new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
-                            return;
-                        }
-                    } else {
-                        logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
-                                new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
-                        return;
+            if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) ) {
+                // leader
+                if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+                    return false;
+                }
+                if ( !IS_LEADER.get() ) {
+                    logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
+                            new Object[] {job, name, runOn[0]});
+                    return false;
+                }
+            } else if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]) ) {
+                // single instance
+                if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+                    return false;
+                }
+                final String myId = checkSlingId(logger, job, name, runOn);
+                if ( myId == null ) {
+                    return false;
+                }
+                final String[] ids = QuartzJobExecutor.SLING_IDS.get();
+                boolean schedule = false;
+                if ( ids != null ) {
+                    int index = 0;
+                    try {
+                        final MessageDigest m = MessageDigest.getInstance("MD5");
+                        m.reset();
+                        m.update(job.getClass().getName().getBytes("UTF-8"));
+                        index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
+                    } catch ( final IOException | NoSuchAlgorithmException ex ) {
+                        // although this should never happen (MD5 and UTF-8 are always available) we consider
+                        // this an error case
+                        logger.error("Unable to distribute scheduled job " + job + " with name " + name, ex);
+                        return false;
                     }
-                } else {
-                    logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
-                            new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), runOn[0]});
-                    return;
+                    schedule = myId.equals(ids[index]);
+                }
+                if ( !schedule ) {
+                    logger.debug("Excluding job {} with name {} and config {} - distributed to different Sling instance",
+                            new Object[] {job, name, runOn});
+                    return false;
                 }
             } else { // sling IDs
-                final String myId = SLING_ID;
+                final String myId = checkSlingId(logger, job, name, runOn);
                 if ( myId == null ) {
-                    logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
-                            new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), Arrays.toString(runOn)});
-                    return;
+                    return false;
                 } else {
                     boolean schedule = false;
                     for(final String id : runOn ) {
@@ -95,14 +148,31 @@
                     }
                     if ( !schedule ) {
                         logger.debug("Excluding job {} with name {} and config {} - different Sling ID",
-                                new Object[] {job, data.get(QuartzScheduler.DATA_MAP_NAME), Arrays.toString(runOn)});
-                        return;
+                                new Object[] {job, name, Arrays.toString(runOn)});
+                        return false;
                     }
                 }
             }
         }
+        return true;
+    }
 
+    /**
+     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+     */
+    @Override
+    public void execute(final JobExecutionContext context) throws JobExecutionException {
+
+        final JobDataMap data = context.getJobDetail().getJobDataMap();
+        final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+        final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
+
+        // check run on information
         final String name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
+        if ( !shouldRun(logger, job, name, (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON)) ) {
+            return;
+        }
+
         String origThreadName = Thread.currentThread().getName();
         try {
             Thread.currentThread().setName(origThreadName + "-" + name);
@@ -144,6 +214,7 @@
         /**
          * @see org.apache.sling.commons.scheduler.JobContext#getConfiguration()
          */
+        @Override
         public Map<String, Serializable> getConfiguration() {
             return this.configuration;
         }
@@ -151,6 +222,7 @@
         /**
          * @see org.apache.sling.commons.scheduler.JobContext#getName()
          */
+        @Override
         public String getName() {
             return this.name;
         }
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java b/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
index 90dddca..fce6bd1 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/TopologyHandler.java
@@ -16,6 +16,11 @@
  */
 package org.apache.sling.commons.scheduler.impl;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
@@ -52,11 +57,18 @@
     @Override
     public void handleTopologyEvent(final TopologyEvent event) {
         if ( event.getType() == Type.TOPOLOGY_INIT || event.getType() == Type.TOPOLOGY_CHANGED ) {
+            final List<String> ids = new ArrayList<>();
+            for(final InstanceDescription desc : event.getNewView().getInstances()) {
+                ids.add(desc.getSlingId());
+            }
+            Collections.sort(ids);
             QuartzJobExecutor.IS_LEADER.set(event.getNewView().getLocalInstance().isLeader());
             QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.set(true);
+            QuartzJobExecutor.SLING_IDS.set(ids.toArray(new String[ids.size()]));
         } else if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
             QuartzJobExecutor.IS_LEADER.set(false);
             QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.set(false);
+            QuartzJobExecutor.SLING_IDS.set(null);
         }
     }
 }
diff --git a/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java b/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
index 18d80aa..a61fc71 100644
--- a/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
+++ b/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.sling.commons.scheduler.impl;
 
+import java.util.Arrays;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -122,8 +123,12 @@
     private String getServiceIdentifier(final ServiceReference<?> ref) {
         String name = getStringProperty(ref, Scheduler.PROPERTY_SCHEDULER_NAME);
         if ( name == null ) {
-            name = getStringProperty(ref, Constants.SERVICE_PID);
-            if ( name == null ) {
+            final Object pid = ref.getProperty(Constants.SERVICE_PID);
+            if ( pid instanceof String ) {
+                name = (String)pid;
+            } else if ( pid instanceof String[] ) {
+                name = Arrays.toString((String[])pid);
+            } else {
                 name = "Registered Service";
             }
         }