| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.event.impl; |
| |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.Dictionary; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| |
| import javax.jcr.Item; |
| import javax.jcr.ItemExistsException; |
| import javax.jcr.Node; |
| import javax.jcr.NodeIterator; |
| import javax.jcr.RepositoryException; |
| import javax.jcr.Session; |
| import javax.jcr.Value; |
| import javax.jcr.observation.EventIterator; |
| import javax.jcr.query.Query; |
| import javax.jcr.query.QueryManager; |
| |
| import org.apache.jackrabbit.util.ISO8601; |
| import org.apache.sling.commons.osgi.OsgiUtil; |
| import org.apache.sling.commons.scheduler.Scheduler; |
| import org.apache.sling.commons.threads.ThreadPool; |
| import org.apache.sling.event.EventPropertiesMap; |
| import org.apache.sling.event.EventUtil; |
| import org.apache.sling.event.JobStatusProvider; |
| import org.osgi.framework.Constants; |
| import org.osgi.service.component.ComponentConstants; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.event.Event; |
| import org.osgi.service.event.EventAdmin; |
| |
| |
| /** |
| * An event handler for special job events. |
| * |
| * @scr.component label="%job.events.name" description="%job.events.description" immediate="true" |
| * @scr.service interface="org.apache.sling.event.JobStatusProvider" |
| * @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB" |
| * values.updated="org/osgi/framework/BundleEvent/UPDATED" |
| * values.started="org/osgi/framework/BundleEvent/STARTED" |
| * private="true" |
| * @scr.property name="repository.path" value="/var/eventing/jobs" private="true" |
| * We schedule this event handler to run in the background and clean up |
| * obsolete events. |
| * @scr.service interface="java.lang.Runnable" |
| * @scr.property name="scheduler.period" value="300" type="Long" label="%jobscheduler.period.name" description="%jobscheduler.period.description" |
| * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true" |
| */ |
| public class JobEventHandler |
| extends AbstractRepositoryEventHandler |
| implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable { |
| |
| /** A map for keeping track of currently processed job topics. */ |
| private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>(); |
| |
| /** A map for the different job queues. */ |
| private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>(); |
| |
| /** Default sleep time. */ |
| private static final long DEFAULT_SLEEP_TIME = 30; |
| |
| /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */ |
| private static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time"; |
| |
| /** Default number of job retries. */ |
| private static final int DEFAULT_MAX_JOB_RETRIES = 10; |
| |
| /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */ |
| private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries"; |
| |
| /** Default number of seconds to wait for an ack. */ |
| private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs |
| |
| /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */ |
| private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs"; |
| |
| /** Default nubmer of parallel jobs. */ |
| private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15; |
| |
| /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */ |
| private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack"; |
| |
| /** We check every 30 secs by default. */ |
| private long sleepTime; |
| |
| /** How often should a job be retried by default. */ |
| private int maxJobRetries; |
| |
| /** How long do we wait for an ack (in ms) */ |
| private long waitForAckMs; |
| |
| /** Maximum parallel running jobs for a single queue. */ |
| private long maximumParallelJobs; |
| |
| /** Background session. */ |
| private Session backgroundSession; |
| |
| /** Unloaded jobs. */ |
| private Set<String>unloadedJobs = new HashSet<String>(); |
| |
| /** List of deleted jobs. */ |
| private Set<String>deletedJobs = new HashSet<String>(); |
| |
| /** Default clean up time is 5 minutes. */ |
| private static final int DEFAULT_CLEANUP_PERIOD = 5; |
| |
| /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" label="%jobcleanup.period.name" description="%jobcleanup.period.description" */ |
| private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period"; |
| |
| /** We remove everything which is older than 5 min by default. */ |
| private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD; |
| |
| /** The scheduler for rescheduling jobs. @scr.reference */ |
| private Scheduler scheduler; |
| |
| /** Our component context. */ |
| private ComponentContext componentContext; |
| |
| /** The map of events we're currently processing. */ |
| private final Map<String, StartedJobInfo> processingEventsList = new HashMap<String, StartedJobInfo>(); |
| |
| public static ThreadPool JOB_THREAD_POOL; |
| |
| /** Sync lock */ |
| private final Object writeLock = new Object(); |
| |
| /** Sync lock */ |
| private final Object backgroundLock = new Object(); |
| |
| /** Number of parallel jobs for the main queue. */ |
| private long parallelJobCount; |
| |
| /** |
| * Activate this component. |
| * @param context |
| * @throws RepositoryException |
| */ |
| protected void activate(final ComponentContext context) |
| throws Exception { |
| @SuppressWarnings("unchecked") |
| final Dictionary<String, Object> props = context.getProperties(); |
| this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD); |
| this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME); |
| this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES); |
| this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK) * 1000; |
| this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS), DEFAULT_MAXIMUM_PARALLEL_JOBS); |
| this.componentContext = context; |
| super.activate(context); |
| JOB_THREAD_POOL = this.threadPool; |
| } |
| |
| /** |
| * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext) |
| */ |
| protected void deactivate(final ComponentContext context) { |
| super.deactivate(context); |
| synchronized ( this.jobQueues ) { |
| final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator(); |
| while ( i.hasNext() ) { |
| final JobBlockingQueue jbq = i.next(); |
| // wake up qeue |
| if ( jbq.isWaiting() ) { |
| synchronized ( jbq.getLock()) { |
| jbq.notifyFinish(null); |
| } |
| } |
| // continue queue processing |
| try { |
| jbq.put(new EventInfo()); |
| } catch (InterruptedException e) { |
| this.ignoreException(e); |
| } |
| } |
| } |
| if ( this.backgroundSession != null ) { |
| synchronized ( this.backgroundLock ) { |
| try { |
| this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this); |
| } catch (RepositoryException e) { |
| // we just ignore it |
| this.logger.warn("Unable to remove event listener.", e); |
| } |
| this.backgroundSession.logout(); |
| this.backgroundSession = null; |
| } |
| } |
| this.componentContext = null; |
| JOB_THREAD_POOL = null; |
| } |
| |
| /** |
| * Return the query string for the clean up. |
| */ |
| private String getCleanUpQueryString() { |
| final Calendar deleteBefore = Calendar.getInstance(); |
| deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod); |
| final String dateString = ISO8601.format(deleteBefore); |
| |
| final StringBuffer buffer = new StringBuffer("/jcr:root"); |
| buffer.append(this.repositoryPath); |
| buffer.append("//element(*, "); |
| buffer.append(getEventNodeType()); |
| buffer.append(")[@"); |
| buffer.append(EventHelper.NODE_PROPERTY_FINISHED); |
| buffer.append(" < xs:dateTime('"); |
| buffer.append(dateString); |
| buffer.append("')]"); |
| |
| return buffer.toString(); |
| } |
| |
| /** |
| * This method is invoked periodically. |
| * @see java.lang.Runnable#run() |
| */ |
| public void run() { |
| if ( this.running ) { |
| // check for jobs that were started but never got an aknowledge |
| final long tooOld = System.currentTimeMillis() - this.waitForAckMs; |
| // to keep the synchronized block as fast as possible we just store the |
| // jobs to be removed in a new list and process this list afterwards |
| final List<StartedJobInfo> restartJobs = new ArrayList<StartedJobInfo>(); |
| synchronized ( this.processingEventsList ) { |
| final Iterator<Map.Entry<String, StartedJobInfo>> i = this.processingEventsList.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| final Map.Entry<String, StartedJobInfo> entry = i.next(); |
| if ( entry.getValue().started <= tooOld ) { |
| restartJobs.add(entry.getValue()); |
| } |
| } |
| } |
| // remove obsolete jobs from the repository |
| if ( this.cleanupPeriod > 0 ) { |
| this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod); |
| |
| final String queryString = this.getCleanUpQueryString(); |
| // we create an own session for concurrency issues |
| Session s = null; |
| try { |
| s = this.createSession(); |
| final Node parentNode = (Node)s.getItem(this.repositoryPath); |
| logger.debug("Executing query {}", queryString); |
| final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH); |
| final NodeIterator iter = q.execute().getNodes(); |
| int count = 0; |
| while ( iter.hasNext() ) { |
| final Node eventNode = iter.nextNode(); |
| eventNode.remove(); |
| count++; |
| } |
| parentNode.save(); |
| logger.debug("Removed {} entries from the repository.", count); |
| |
| } catch (RepositoryException e) { |
| // in the case of an error, we just log this as a warning |
| this.logger.warn("Exception during repository cleanup.", e); |
| } finally { |
| if ( s != null ) { |
| s.logout(); |
| } |
| } |
| } |
| // restart jobs is now a list of potential candidates, we now have to check |
| // each candidate separately again! |
| if ( restartJobs.size() > 0 ) { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| // we just ignore this |
| e.printStackTrace(); |
| } |
| } |
| final Iterator<StartedJobInfo> jobIter = restartJobs.iterator(); |
| while ( jobIter.hasNext() ) { |
| final StartedJobInfo info = jobIter.next(); |
| boolean process = false; |
| synchronized ( this.processingEventsList ) { |
| process = this.processingEventsList.remove(info.nodePath) != null; |
| } |
| if ( process ) { |
| this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath); |
| this.finishedJob(info.event, info.nodePath, true); |
| } |
| } |
| |
| // check for idle threads |
| synchronized ( this.jobQueues ) { |
| final Iterator<Map.Entry<String, JobBlockingQueue>> i = this.jobQueues.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| final Map.Entry<String, JobBlockingQueue> current = i.next(); |
| final JobBlockingQueue jbq = current.getValue(); |
| if ( jbq.size() == 0 ) { |
| if ( jbq.isMarkedForCleanUp() ) { |
| // set to finished |
| jbq.setFinished(true); |
| // wake up |
| try { |
| jbq.put(new EventInfo()); |
| } catch (InterruptedException e) { |
| this.ignoreException(e); |
| } |
| // remove |
| i.remove(); |
| } else { |
| // mark to be removed during next cycle |
| jbq.markForCleanUp(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue() |
| */ |
| protected void processWriteQueue() { |
| while ( this.running ) { |
| // so let's wait/get the next job from the queue |
| Event event = null; |
| try { |
| event = this.writeQueue.take(); |
| } catch (InterruptedException e) { |
| // we ignore this |
| this.ignoreException(e); |
| } |
| if ( event != null && this.running ) { |
| logger.debug("Persisting job {}", event); |
| try { |
| this.writerSession.refresh(false); |
| } catch (RepositoryException re) { |
| // we just ignore this |
| this.ignoreException(re); |
| } |
| final EventInfo info = new EventInfo(); |
| info.event = event; |
| final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID); |
| final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| final String nodePath = this.getNodePath(jobTopic, jobId); |
| |
| // if the job has no job id, we can just write the job to the repo and don't |
| // need locking |
| if ( jobId == null ) { |
| try { |
| final Node eventNode = this.writeEvent(event, nodePath); |
| info.nodePath = eventNode.getPath(); |
| } catch (RepositoryException re ) { |
| // something went wrong, so let's log it |
| this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re); |
| } |
| } else { |
| try { |
| // let's first search for an existing node with the same id |
| final Node parentNode = this.ensureRepositoryPath(); |
| Node foundNode = null; |
| if ( parentNode.hasNode(nodePath) ) { |
| foundNode = parentNode.getNode(nodePath); |
| } |
| if ( foundNode != null ) { |
| // if the node is locked, someone else was quicker |
| // and we don't have to process this job |
| if ( !foundNode.isLocked() ) { |
| // node is already in repository, so if not finished we just use it |
| // otherwise it has already been processed |
| try { |
| if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) { |
| info.nodePath = foundNode.getPath(); |
| } |
| } catch (RepositoryException re) { |
| // if anything goes wrong, it means that (hopefully) someone |
| // else is processing this node |
| } |
| } |
| } else { |
| // We now write the event into the repository |
| try { |
| final Node eventNode = this.writeEvent(event, nodePath); |
| info.nodePath = eventNode.getPath(); |
| } catch (ItemExistsException iee) { |
| // someone else did already write this node in the meantime |
| // nothing to do for us |
| } |
| } |
| } catch (RepositoryException re ) { |
| // something went wrong, so let's log it |
| this.logger.error("Exception during writing new job '" + event + "' to repository at " + nodePath, re); |
| } |
| } |
| // if we were able to write the event into the repository |
| // we will queue it for processing |
| if ( info.nodePath != null ) { |
| try { |
| this.queue.put(info); |
| } catch (InterruptedException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * This method runs in the background and processes the local queue. |
| */ |
| protected void runInBackground() throws RepositoryException { |
| this.backgroundSession = this.createSession(); |
| this.backgroundSession.getWorkspace().getObservationManager() |
| .addEventListener(this, |
| javax.jcr.observation.Event.PROPERTY_REMOVED |
| |javax.jcr.observation.Event.NODE_REMOVED, |
| this.repositoryPath, |
| true, |
| null, |
| new String[] {this.getEventNodeType()}, |
| true); |
| // give the system some time to start |
| try { |
| Thread.sleep(1000 * 30); // 30 secs |
| } catch (InterruptedException e) { |
| this.ignoreException(e); |
| } |
| // load unprocessed jobs from repository |
| if ( this.running ) { |
| this.loadJobs(); |
| } else { |
| final ComponentContext ctx = this.componentContext; |
| // deactivate |
| if ( ctx != null ) { |
| logger.info("Deactivating component {} due to errors during startup.", ctx.getProperties().get(Constants.SERVICE_ID)); |
| final String name = (String) componentContext.getProperties().get( |
| ComponentConstants.COMPONENT_NAME); |
| ctx.disableComponent(name); |
| } |
| } |
| while ( this.running ) { |
| // so let's wait/get the next job from the queue |
| EventInfo info = null; |
| try { |
| info = this.queue.take(); |
| } catch (InterruptedException e) { |
| // we ignore this |
| this.ignoreException(e); |
| } |
| |
| if ( info != null && this.running ) { |
| // check for local only jobs and remove them from the queue if they're meant |
| // for another application node |
| final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION); |
| if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null |
| && appId != null && !this.applicationId.equals(appId) ) { |
| info = null; |
| } |
| |
| // check if we should put this into a separate queue |
| if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) { |
| final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME); |
| synchronized ( this.jobQueues ) { |
| BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName); |
| if ( jobQueue == null ) { |
| final JobBlockingQueue jq = new JobBlockingQueue(); |
| jobQueue = jq; |
| this.jobQueues.put(queueName, jq); |
| // Start background thread |
| this.threadPool.execute(new Runnable() { |
| |
| /** |
| * @see java.lang.Runnable#run() |
| */ |
| public void run() { |
| while ( running && !jq.isFinished() ) { |
| logger.info("Starting job queue {}", queueName); |
| try { |
| runJobQueue(queueName, jq); |
| } catch (Throwable t) { |
| logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t); |
| } |
| } |
| } |
| |
| }); |
| } |
| try { |
| jobQueue.put(info); |
| } catch (InterruptedException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| } |
| // don't process this here |
| info = null; |
| } |
| |
| // if we still have a job, process it |
| if ( info != null ) { |
| this.executeJob(info, null); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Execute a job queue |
| * @param queueName The name of the job queue |
| * @param jobQueue The job queue |
| */ |
| private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) { |
| EventInfo info = null; |
| while ( this.running && !jobQueue.isFinished() ) { |
| if ( info == null ) { |
| // so let's wait/get the next job from the queue |
| try { |
| info = jobQueue.take(); |
| } catch (InterruptedException e) { |
| // we ignore this |
| this.ignoreException(e); |
| } |
| } |
| |
| if ( info != null && this.running && !jobQueue.isFinished() ) { |
| synchronized ( jobQueue.getLock()) { |
| final EventInfo processInfo = info; |
| info = null; |
| if ( this.executeJob(processInfo, jobQueue) ) { |
| EventInfo newInfo = null; |
| try { |
| newInfo = jobQueue.waitForFinish(); |
| } catch (InterruptedException e) { |
| this.ignoreException(e); |
| } |
| // if we have an info, this is a reschedule |
| if ( newInfo != null ) { |
| final EventInfo newEventInfo = newInfo; |
| final Event job = newInfo.event; |
| |
| // is this an ordered queue? |
| final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null; |
| |
| if ( orderedQueue ) { |
| // we just sleep for the delay time - if none, we continue and retry |
| // this job again |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { |
| final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); |
| jobQueue.setSleeping(true, Thread.currentThread()); |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| this.ignoreException(e); |
| } finally { |
| jobQueue.setSleeping(false); |
| } |
| } |
| info = newInfo; |
| } else { |
| // delay rescheduling? |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { |
| final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); |
| final Date fireDate = new Date(); |
| fireDate.setTime(System.currentTimeMillis() + delay); |
| |
| final String schedulerJobName = "Waiting:" + queueName; |
| final Runnable t = new Runnable() { |
| public void run() { |
| jobQueue.setSleeping(true, schedulerJobName); |
| try { |
| jobQueue.put(newEventInfo); |
| } catch (InterruptedException e) { |
| // this should never happen |
| ignoreException(e); |
| } finally { |
| jobQueue.setSleeping(false); |
| } |
| } |
| }; |
| try { |
| this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate); |
| } catch (Exception e) { |
| // we ignore the exception and just put back the job in the queue |
| ignoreException(e); |
| t.run(); |
| } |
| } else { |
| // put directly into queue |
| try { |
| jobQueue.put(newInfo); |
| } catch (InterruptedException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Process a job |
| */ |
| private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) { |
| boolean putback = false; |
| boolean wait = false; |
| synchronized (this.backgroundLock) { |
| try { |
| this.backgroundSession.refresh(false); |
| // check if the node still exists |
| if ( this.backgroundSession.itemExists(info.nodePath) |
| && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) { |
| final Event event = info.event; |
| final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null |
| || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null; |
| |
| // check how we can process this job |
| // if parallel processing is allowed, we can just process |
| // if not we should check if any other job with the same topic is currently running |
| boolean process = parallelProcessing; |
| if ( !parallelProcessing ) { |
| synchronized ( this.processingMap ) { |
| final Boolean value = this.processingMap.get(jobTopic); |
| if ( value == null || !value.booleanValue() ) { |
| this.processingMap.put(jobTopic, Boolean.TRUE); |
| process = true; |
| } |
| } |
| |
| } else { |
| // check number of parallel jobs for main queue |
| if ( jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) { |
| process = false; |
| wait = true; |
| } |
| } |
| if ( process ) { |
| boolean unlock = true; |
| try { |
| final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath); |
| if ( !eventNode.isLocked() ) { |
| // lock node |
| try { |
| eventNode.lock(false, true); |
| } catch (RepositoryException re) { |
| // lock failed which means that the node is locked by someone else, so we don't have to requeue |
| process = false; |
| } |
| if ( process ) { |
| unlock = false; |
| this.processJob(info.event, eventNode, jobQueue == null); |
| return true; |
| } |
| } |
| } catch (RepositoryException e) { |
| // ignore |
| this.ignoreException(e); |
| } finally { |
| if ( unlock && !parallelProcessing ) { |
| synchronized ( this.processingMap ) { |
| this.processingMap.put(jobTopic, Boolean.FALSE); |
| } |
| } |
| } |
| } else { |
| try { |
| // check if the node is in processing or already finished |
| final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath); |
| if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) { |
| putback = true; |
| } |
| } catch (RepositoryException e) { |
| // ignore |
| this.ignoreException(e); |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| this.ignoreException(re); |
| } |
| |
| } |
| // if this is the main queue and we have reached the max number of parallel jobs |
| // we wait a little bit before continuing |
| if ( wait ) { |
| try { |
| Thread.sleep(sleepTime * 1000); |
| } catch (InterruptedException ie) { |
| // ignore |
| ignoreException(ie); |
| } |
| } |
| // if we have to put back the job, we do it now |
| if ( putback ) { |
| final EventInfo eInfo = info; |
| final Date fireDate = new Date(); |
| fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000); |
| |
| // we put it back into the queue after a specific time |
| final Runnable r = new Runnable() { |
| |
| /** |
| * @see java.lang.Runnable#run() |
| */ |
| public void run() { |
| try { |
| queue.put(eInfo); |
| } catch (InterruptedException e) { |
| // ignore |
| ignoreException(e); |
| } |
| } |
| |
| }; |
| try { |
| this.scheduler.fireJobAt(null, r, null, fireDate); |
| } catch (Exception e) { |
| // we ignore the exception |
| ignoreException(e); |
| // then wait for the time and readd the job |
| try { |
| Thread.sleep(sleepTime * 1000); |
| } catch (InterruptedException ie) { |
| // ignore |
| ignoreException(ie); |
| } |
| r.run(); |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getEventNodeType() |
| */ |
| protected String getEventNodeType() { |
| return EventHelper.JOB_NODE_TYPE; |
| } |
| |
| /** |
| * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event) |
| */ |
| public void handleEvent(final Event event) { |
| logger.debug("Receiving event {}", event); |
| // we ignore remote job events |
| if ( EventUtil.isLocal(event) ) { |
| // check for bundle event |
| if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) { |
| logger.debug("Handling local job {}", event); |
| // job event |
| final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| |
| // job topic must be set, otherwise we ignore this event! |
| if ( jobTopic != null ) { |
| // queue the event in order to respond quickly |
| try { |
| this.writeQueue.put(event); |
| } catch (InterruptedException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| } else { |
| this.logger.warn("Event does not contain job topic: {}", event); |
| } |
| |
| } else { |
| // bundle event started or updated |
| boolean doIt = false; |
| synchronized ( this.unloadedJobs ) { |
| if ( this.unloadedJobs.size() > 0 ) { |
| doIt = true; |
| } |
| } |
| if ( doIt ) { |
| final Runnable t = new Runnable() { |
| |
| public void run() { |
| synchronized (unloadedJobs) { |
| Session s = null; |
| final Set<String> newUnloadedJobs = new HashSet<String>(); |
| newUnloadedJobs.addAll(unloadedJobs); |
| try { |
| s = createSession(); |
| for(String path : unloadedJobs ) { |
| newUnloadedJobs.remove(path); |
| try { |
| if ( s.itemExists(path) ) { |
| final Node eventNode = (Node) s.getItem(path); |
| if ( !eventNode.isLocked() ) { |
| try { |
| final EventInfo info = new EventInfo(); |
| info.event = readEvent(eventNode); |
| info.nodePath = path; |
| try { |
| queue.put(info); |
| } catch (InterruptedException e) { |
| // we ignore this exception as this should never occur |
| ignoreException(e); |
| } |
| } catch (ClassNotFoundException cnfe) { |
| newUnloadedJobs.add(path); |
| ignoreException(cnfe); |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| // we ignore this and readd |
| newUnloadedJobs.add(path); |
| ignoreException(re); |
| } |
| } |
| } catch (RepositoryException re) { |
| // unable to create session, so we try it again next time |
| ignoreException(re); |
| } finally { |
| if ( s != null ) { |
| s.logout(); |
| } |
| unloadedJobs.clear(); |
| unloadedJobs.addAll(newUnloadedJobs); |
| } |
| } |
| } |
| |
| }; |
| this.threadPool.execute(t); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create a unique node path (folder and name) for the job. |
| */ |
| private String getNodePath(final String jobTopic, final String jobId) { |
| if ( jobId != null ) { |
| return jobTopic.replace('/', '.') + "/" + EventHelper.filter(jobId); |
| } |
| return jobTopic.replace('/', '.') + "/Job " + UUID.randomUUID().toString(); |
| } |
| |
| /** |
| * Process a job and unlock the node in the repository. |
| * @param event The original event. |
| * @param eventNode The node in the repository where the job is stored. |
| * @param isMainQueue Is this the main queue? |
| */ |
| private void processJob(Event event, Node eventNode, boolean isMainQueue) { |
| final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null |
| || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null; |
| final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| boolean unlock = true; |
| try { |
| if ( isMainQueue && !parallelProcessing ) { |
| this.parallelJobCount++; |
| } |
| final String nodePath = eventNode.getPath(); |
| final Event jobEvent = this.getJobEvent(event, nodePath); |
| eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId); |
| eventNode.save(); |
| final EventAdmin localEA = this.eventAdmin; |
| if ( localEA != null ) { |
| final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis()); |
| // let's add the event to our processing list |
| synchronized ( this.processingEventsList ) { |
| this.processingEventsList.put(nodePath, jobInfo); |
| } |
| |
| // we need async delivery, otherwise we might create a deadlock |
| // as this method runs inside a synchronized block and the finishedJob |
| // method as well! |
| localEA.postEvent(jobEvent); |
| // do not unlock if sending was successful |
| unlock = false; |
| } else { |
| this.logger.error("Job event can't be sent as no event admin is available."); |
| } |
| } catch (RepositoryException re) { |
| // if an exception occurs, we just log |
| this.logger.error("Exception during job processing.", re); |
| } finally { |
| if ( unlock ) { |
| if ( isMainQueue && !parallelProcessing ) { |
| this.parallelJobCount--; |
| } |
| if ( !parallelProcessing ) { |
| synchronized ( this.processingMap ) { |
| this.processingMap.put(jobTopic, Boolean.FALSE); |
| } |
| } |
| // unlock node |
| try { |
| eventNode.unlock(); |
| } catch (RepositoryException e) { |
| // if unlock fails, we silently ignore this |
| this.ignoreException(e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create the real job event. |
| * This generates a new event object with the same properties, but with the |
| * {@link EventUtil#PROPERTY_JOB_TOPIC} topic. |
| * @param e The job event. |
| * @return The real job event. |
| */ |
| private Event getJobEvent(Event e, String nodePath) { |
| final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| final Dictionary<String, Object> properties = new EventPropertiesMap(e); |
| // put properties for finished job callback |
| properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, |
| new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath)); |
| return new Event(eventTopic, properties); |
| } |
| |
| /** |
| * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event) |
| */ |
| protected void addNodeProperties(Node eventNode, Event event) |
| throws RepositoryException { |
| super.addNodeProperties(eventNode, event); |
| eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC)); |
| final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID); |
| if ( jobId != null ) { |
| eventNode.setProperty(EventHelper.NODE_PROPERTY_JOBID, jobId); |
| } |
| final long retryCount = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT), 0); |
| final long retries = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRIES), this.maxJobRetries); |
| eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount); |
| eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, retries); |
| } |
| |
| /** |
| * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary) |
| */ |
| protected void addEventProperties(Node eventNode, |
| Dictionary<String, Object> properties) |
| throws RepositoryException { |
| super.addEventProperties(eventNode, properties); |
| // convert to integers (jcr only supports long) |
| if ( properties.get(EventUtil.PROPERTY_JOB_RETRIES) != null ) { |
| properties.put(EventUtil.PROPERTY_JOB_RETRIES, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRIES).toString())); |
| } |
| if ( properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) { |
| properties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT).toString())); |
| } |
| // add application id |
| properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString()); |
| } |
| |
| /** |
| * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator) |
| */ |
| public void onEvent(EventIterator iter) { |
| // we create an own session here |
| Session s = null; |
| try { |
| s = this.createSession(); |
| while ( iter.hasNext() ) { |
| final javax.jcr.observation.Event event = iter.nextEvent(); |
| if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED |
| || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) { |
| try { |
| final String propPath = event.getPath(); |
| int pos = propPath.lastIndexOf('/'); |
| final String nodePath = propPath.substring(0, pos); |
| final String propertyName = propPath.substring(pos+1); |
| |
| // we are only interested in unlocks |
| if ( "jcr:lockOwner".equals(propertyName) ) { |
| boolean doNotProcess = false; |
| synchronized ( this.deletedJobs ) { |
| doNotProcess = this.deletedJobs.remove(nodePath); |
| } |
| if ( !doNotProcess ) { |
| final Node eventNode = (Node) s.getItem(nodePath); |
| if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) { |
| try { |
| final EventInfo info = new EventInfo(); |
| info.event = this.readEvent(eventNode); |
| info.nodePath = nodePath; |
| try { |
| this.queue.put(info); |
| } catch (InterruptedException e) { |
| // we ignore this exception as this should never occur |
| this.ignoreException(e); |
| } |
| } catch (ClassNotFoundException cnfe) { |
| // store path for lazy loading |
| synchronized ( this.unloadedJobs ) { |
| this.unloadedJobs.add(nodePath); |
| } |
| this.ignoreException(cnfe); |
| } |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| this.logger.error("Exception during jcr event processing.", re); |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| this.logger.error("Unable to create a session.", re); |
| } finally { |
| if ( s != null ) { |
| s.logout(); |
| } |
| } |
| } |
| |
| /** |
| * Load all active jobs from the repository. |
| * @throws RepositoryException |
| */ |
| private void loadJobs() { |
| try { |
| final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager(); |
| final StringBuffer buffer = new StringBuffer("/jcr:root"); |
| buffer.append(this.repositoryPath); |
| buffer.append("//element(*, "); |
| buffer.append(this.getEventNodeType()); |
| buffer.append(") order by @"); |
| buffer.append(EventHelper.NODE_PROPERTY_CREATED); |
| buffer.append(" ascending"); |
| final Query q = qManager.createQuery(buffer.toString(), Query.XPATH); |
| final NodeIterator result = q.execute().getNodes(); |
| while ( result.hasNext() ) { |
| final Node eventNode = result.nextNode(); |
| if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) { |
| final String nodePath = eventNode.getPath(); |
| try { |
| final Event event = this.readEvent(eventNode); |
| final EventInfo info = new EventInfo(); |
| info.event = event; |
| info.nodePath = nodePath; |
| try { |
| this.queue.put(info); |
| } catch (InterruptedException e) { |
| // we ignore this exception as this should never occur |
| this.ignoreException(e); |
| } |
| } catch (ClassNotFoundException cnfe) { |
| // store path for lazy loading |
| synchronized ( this.unloadedJobs ) { |
| this.unloadedJobs.add(nodePath); |
| } |
| this.ignoreException(cnfe); |
| } catch (RepositoryException re) { |
| this.logger.error("Unable to load stored job from " + nodePath, re); |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| this.logger.error("Exception during initial loading of stored jobs.", re); |
| } |
| } |
| |
| /** |
| * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event, java.lang.String) |
| */ |
| public boolean sendAcknowledge(Event job, String eventNodePath) { |
| synchronized ( this.processingEventsList ) { |
| // if the event is still in the processing list, we confirm the ack |
| final Object ack = this.processingEventsList.remove(eventNodePath); |
| return ack != null; |
| } |
| |
| } |
| |
| /** |
| * This is a notification from the component which processed the job. |
| * |
| * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean) |
| */ |
| public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) { |
| // let's remove the event from our processing list |
| // this is just a sanity check, as usually the job should have been |
| // removed during sendAcknowledge. |
| synchronized ( this.processingEventsList ) { |
| this.processingEventsList.remove(eventNodePath); |
| } |
| |
| boolean reschedule = shouldReschedule; |
| if ( shouldReschedule ) { |
| // check if we exceeded the number of retries |
| int retries = this.maxJobRetries; |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) { |
| retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES); |
| } |
| int retryCount = 0; |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) { |
| retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT); |
| } |
| retryCount++; |
| if ( retries != -1 && retryCount > retries ) { |
| reschedule = false; |
| } |
| if ( reschedule ) { |
| // update event with retry count and retries |
| final Dictionary<String, Object> newProperties = new EventPropertiesMap(job); |
| newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount); |
| newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries); |
| job = new Event(job.getTopic(), newProperties); |
| } |
| } |
| final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null |
| || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null; |
| EventInfo putback = null; |
| // we have to use the same session for unlocking that we used for locking! |
| synchronized ( this.backgroundLock ) { |
| // we might get here asnyc while this service has already been shutdown! |
| if ( this.backgroundSession == null ) { |
| // we can only return false here |
| return false; |
| } |
| try { |
| this.backgroundSession.refresh(false); |
| // check if the job has been cancelled |
| if ( !this.backgroundSession.itemExists(eventNodePath) ) { |
| return true; |
| } |
| final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath); |
| boolean unlock = true; |
| try { |
| if ( !reschedule ) { |
| synchronized ( this.deletedJobs ) { |
| this.deletedJobs.add(eventNodePath); |
| } |
| // unlock node |
| try { |
| eventNode.unlock(); |
| } catch (RepositoryException e) { |
| // if unlock fails, we silently ignore this |
| this.ignoreException(e); |
| } |
| unlock = false; |
| final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID); |
| if ( jobId == null ) { |
| // remove node from repository if no job is set |
| final Node parentNode = eventNode.getParent(); |
| eventNode.remove(); |
| parentNode.save(); |
| } else { |
| eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance()); |
| eventNode.save(); |
| } |
| } |
| } catch (RepositoryException re) { |
| // if an exception occurs, we just log |
| this.logger.error("Exception during job finishing.", re); |
| } finally { |
| if ( !parallelProcessing) { |
| final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC); |
| synchronized ( this.processingMap ) { |
| this.processingMap.put(jobTopic, Boolean.FALSE); |
| } |
| } else { |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) { |
| this.parallelJobCount--; |
| } |
| } |
| if ( unlock ) { |
| synchronized ( this.deletedJobs ) { |
| this.deletedJobs.add(eventNodePath); |
| } |
| // unlock node |
| try { |
| eventNode.unlock(); |
| } catch (RepositoryException e) { |
| // if unlock fails, we silently ignore this |
| this.ignoreException(e); |
| } |
| } |
| } |
| if ( reschedule ) { |
| // update retry count and retries in the repository |
| try { |
| eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES)); |
| eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT)); |
| eventNode.save(); |
| } catch (RepositoryException re) { |
| // if an exception occurs, we just log |
| this.logger.error("Exception during job updating job rescheduling information.", re); |
| } |
| final EventInfo info = new EventInfo(); |
| try { |
| info.event = job; |
| info.nodePath = eventNode.getPath(); |
| } catch (RepositoryException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| // if this is an own job queue, we simply signal the queue to continue |
| // it will pick up the event and either reschedule or wait |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) { |
| // we know the queue exists |
| final JobBlockingQueue jobQueue; |
| synchronized ( this.jobQueues ) { |
| jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); |
| } |
| synchronized ( jobQueue.getLock()) { |
| jobQueue.notifyFinish(info); |
| } |
| } else { |
| |
| // delay rescheduling? |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { |
| putback = info; |
| } else { |
| // put directly into queue |
| try { |
| queue.put(info); |
| } catch (InterruptedException e) { |
| // this should never happen |
| this.ignoreException(e); |
| } |
| } |
| } |
| } else { |
| // if this is an own job queue, we simply signal the queue to continue |
| // it will pick up the event and continue with the next event |
| if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) { |
| // we know the queue exists |
| final JobBlockingQueue jobQueue; |
| synchronized ( this.jobQueues ) { |
| jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); |
| } |
| synchronized ( jobQueue.getLock()) { |
| jobQueue.notifyFinish(null); |
| } |
| } |
| } |
| } catch (RepositoryException re) { |
| this.logger.error("Unable to create new session.", re); |
| return false; |
| } |
| } |
| if ( putback != null ) { |
| final EventInfo info = putback; |
| final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); |
| final Date fireDate = new Date(); |
| fireDate.setTime(System.currentTimeMillis() + delay); |
| |
| final Runnable t = new Runnable() { |
| public void run() { |
| try { |
| queue.put(info); |
| } catch (InterruptedException e) { |
| // this should never happen |
| ignoreException(e); |
| } |
| } |
| }; |
| try { |
| this.scheduler.fireJobAt(null, t, null, fireDate); |
| } catch (Exception e) { |
| // we ignore the exception and just put back the job in the queue |
| ignoreException(e); |
| t.run(); |
| } |
| } |
| if ( !shouldReschedule ) { |
| return true; |
| } |
| return reschedule; |
| } |
| |
| /** |
| * Search for job nodes |
| * @param topic The job topic |
| * @param filterProps optional filter props |
| * @param locked only active jobs? |
| * @return |
| * @throws RepositoryException |
| */ |
| private Collection<Event> queryJobs(final String topic, |
| final Boolean locked, |
| final Map<String, Object>... filterProps) { |
| // we create a new session |
| Session s = null; |
| final List<Event> jobs = new ArrayList<Event>(); |
| try { |
| s = this.createSession(); |
| final QueryManager qManager = s.getWorkspace().getQueryManager(); |
| final StringBuffer buffer = new StringBuffer("/jcr:root"); |
| buffer.append(this.repositoryPath); |
| if ( topic != null ) { |
| buffer.append('/'); |
| buffer.append(topic.replace('/', '.')); |
| } |
| buffer.append("//element(*, "); |
| buffer.append(this.getEventNodeType()); |
| buffer.append(") [not(@"); |
| buffer.append(EventHelper.NODE_PROPERTY_FINISHED); |
| buffer.append(")"); |
| if ( locked != null ) { |
| if ( locked ) { |
| buffer.append(" and @jcr:lockOwner"); |
| } else { |
| buffer.append(" and not(@jcr:lockOwner)"); |
| } |
| } |
| if ( filterProps != null && filterProps.length > 0 ) { |
| buffer.append(" and ("); |
| int index = 0; |
| for (Map<String,Object> template : filterProps) { |
| if ( index > 0 ) { |
| buffer.append(" or "); |
| } |
| buffer.append('('); |
| final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator(); |
| boolean first = true; |
| while ( i.hasNext() ) { |
| final Map.Entry<String, Object> current = i.next(); |
| // check prop name first |
| final String propName = EventUtil.getNodePropertyName(current.getKey()); |
| if ( propName != null ) { |
| // check value |
| final Value value = EventUtil.getNodePropertyValue(s.getValueFactory(), current.getValue()); |
| if ( value != null ) { |
| if ( first ) { |
| first = false; |
| buffer.append('@'); |
| } else { |
| buffer.append(" and @"); |
| } |
| buffer.append(propName); |
| buffer.append(" = '"); |
| buffer.append(current.getValue()); |
| buffer.append("'"); |
| } |
| } |
| } |
| buffer.append(')'); |
| index++; |
| } |
| buffer.append(')'); |
| } |
| buffer.append("]"); |
| buffer.append(" order by @"); |
| buffer.append(EventHelper.NODE_PROPERTY_CREATED); |
| buffer.append(" ascending"); |
| final String queryString = buffer.toString(); |
| logger.debug("Executing job query {}.", queryString); |
| |
| final Query q = qManager.createQuery(queryString, Query.XPATH); |
| final NodeIterator iter = q.execute().getNodes(); |
| while ( iter.hasNext() ) { |
| final Node eventNode = iter.nextNode(); |
| try { |
| final Event event = this.readEvent(eventNode); |
| jobs.add(event); |
| } catch (ClassNotFoundException cnfe) { |
| // in the case of a class not found exception we just ignore the exception |
| this.ignoreException(cnfe); |
| } |
| } |
| } catch (RepositoryException e) { |
| // in the case of an error, we return an empty list |
| this.ignoreException(e); |
| } finally { |
| if ( s != null) { |
| s.logout(); |
| } |
| } |
| return jobs; |
| } |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String) |
| */ |
| public Collection<Event> getCurrentJobs(String topic) { |
| return this.getCurrentJobs(topic, (Map<String, Object>[])null); |
| } |
| |
| /** |
| * This is deprecated. |
| */ |
| public Collection<Event> scheduledJobs(String topic) { |
| return this.getScheduledJobs(topic); |
| } |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String) |
| */ |
| public Collection<Event> getScheduledJobs(String topic) { |
| return this.getScheduledJobs(topic, (Map<String, Object>[])null); |
| } |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String, java.util.Map...) |
| */ |
| public Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps) { |
| return this.queryJobs(topic, true, filterProps); |
| } |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String, java.util.Map...) |
| */ |
| public Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps) { |
| return this.queryJobs(topic, false, filterProps); |
| } |
| |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#getAllJobs(java.lang.String, java.util.Map...) |
| */ |
| public Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps) { |
| return this.queryJobs(topic, null, filterProps); |
| } |
| |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String, java.lang.String) |
| */ |
| public void cancelJob(String topic, String jobId) { |
| if ( jobId != null && topic != null ) { |
| this.cancelJob(this.getNodePath(topic, jobId)); |
| } |
| } |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String) |
| */ |
| public void cancelJob(String jobId) { |
| if ( jobId != null ) { |
| synchronized ( this.writeLock ) { |
| try { |
| this.writerSession.refresh(false); |
| } catch (RepositoryException e) { |
| this.ignoreException(e); |
| } |
| try { |
| if ( this.writerSession.itemExists(jobId) ) { |
| final Item item = this.writerSession.getItem(jobId); |
| final Node parentNode = item.getParent(); |
| item.remove(); |
| parentNode.save(); |
| } |
| } catch (RepositoryException e) { |
| this.logger.error("Error during cancelling job at " + jobId, e); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * @see org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String) |
| */ |
| public void wakeUpJobQueue(String jobQueueName) { |
| if ( jobQueueName != null ) { |
| synchronized ( this.jobQueues ) { |
| final JobBlockingQueue queue = this.jobQueues.get(jobQueueName); |
| if ( queue != null && queue.isSleeping() ) { |
| final String schedulerJobName = queue.getSchedulerJobName(); |
| final Thread thread = queue.getSleepingThread(); |
| if ( schedulerJobName != null ) { |
| this.scheduler.removeJob(schedulerJobName); |
| } |
| if ( thread != null ) { |
| thread.interrupt(); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| private static final class StartedJobInfo { |
| public final Event event; |
| public final String nodePath; |
| public final long started; |
| |
| public StartedJobInfo(final Event e, final String path, final long started) { |
| this.event = e; |
| this.nodePath = path; |
| this.started = started; |
| } |
| } |
| } |