blob: 0c7cc3a72c6345ca4960df96074123cbf06a835f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.jcr.ItemExistsException;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.EventIterator;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.commons.osgi.OsgiUtil;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
import org.osgi.framework.BundleEvent;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
/**
* An event handler handling special job events.
*
* @scr.component label="%job.events.name" description="%job.events.description"
* @scr.service interface="org.apache.sling.event.JobStatusProvider"
* @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
* values.updated="org/osgi/framework/BundleEvent/UPDATED"
* values.started="org/osgi/framework/BundleEvent/STARTED"
* private="true"
* @scr.property name="repository.path" value="/var/eventing/jobs" private="true"
* We schedule this event handler to run in the background and clean up
* obsolete events.
* @scr.service interface="java.lang.Runnable"
* @scr.property name="scheduler.period" value="600" type="Long"
* @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
*/
public class JobEventHandler
extends AbstractRepositoryEventHandler
implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
/** The topic prefix for bundle events. */
protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.', '/') + '/';
/** A map for keeping track of currently processed job topics. */
protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
/** Default sleep time. */
protected static final long DEFAULT_SLEEP_TIME = 20;
/** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
/** Default number of job retries. */
protected static final int DEFAULT_MAX_JOB_RETRIES = 10;
/** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
protected static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
/** We check every 20 secs by default. */
protected long sleepTime;
/** How often should a job be retried by default. */
protected int maxJobRetries;
/** Background session. */
protected Session backgroundSession;
/** Unloaded jobs. */
protected Set<String>unloadedJobs = new HashSet<String>();
/** List of deleted jobs. */
protected Set<String>deletedJobs = new HashSet<String>();
/** Default clean up time is 10 minutes. */
protected static final int DEFAULT_CLEANUP_PERIOD = 10;
/** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
/** We remove everything which is older than 5 min by default. */
protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
/**
* Activate this component.
* @param context
* @throws RepositoryException
*/
protected void activate(final ComponentContext context)
throws Exception {
@SuppressWarnings("unchecked")
final Dictionary<String, Object> props = context.getProperties();
this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
super.activate(context);
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
*/
protected void deactivate(final ComponentContext context) {
super.deactivate(context);
if ( this.backgroundSession != null ) {
try {
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
// we just ignore it
this.logger.warn("Unable to remove event listener.", e);
}
this.backgroundSession.logout();
this.backgroundSession = null;
}
}
/**
* Return the query string for the clean up.
*/
protected String getCleanUpQueryString() {
final Calendar deleteBefore = Calendar.getInstance();
deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
final String dateString = ISO8601.format(deleteBefore);
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
buffer.append(getEventNodeType());
buffer.append(")[@");
buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
buffer.append(" < xs:dateTime('");
buffer.append(dateString);
buffer.append("')]");
return buffer.toString();
}
/**
* This method is invoked periodically.
* @see java.lang.Runnable#run()
*/
public void run() {
if ( this.cleanupPeriod > 0 ) {
this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
final String queryString = this.getCleanUpQueryString();
// we create an own session for concurrency issues
Session s = null;
try {
s = this.createSession();
final Node parentNode = (Node)s.getItem(this.repositoryPath);
logger.debug("Executing query {}", queryString);
final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
final NodeIterator iter = q.execute().getNodes();
int count = 0;
while ( iter.hasNext() ) {
final Node eventNode = iter.nextNode();
eventNode.remove();
count++;
}
parentNode.save();
logger.debug("Removed {} entries from the repository.", count);
} catch (RepositoryException e) {
// in the case of an error, we just log this as a warning
this.logger.warn("Exception during repository cleanup.", e);
} finally {
if ( s != null ) {
s.logout();
}
}
}
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
protected void processWriteQueue() {
while ( this.running ) {
// so let's wait/get the next job from the queue
Event event = null;
try {
event = this.writeQueue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
if ( event != null && this.running ) {
try {
this.writerSession.refresh(false);
} catch (RepositoryException re) {
// we just ignore this
this.ignoreException(re);
}
final EventInfo info = new EventInfo();
info.event = event;
final String nodeName = this.getNodeName(event);
// if the job has no job id, we can just write the job to the repo and don't
// need locking
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
if ( jobId == null ) {
try {
final Node eventNode = this.writeEvent(event, nodeName);
info.nodePath = eventNode.getPath();
} catch (RepositoryException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during writing new job '" + nodeName + "' to repository.", re);
}
} else {
try {
// let's first search for an existing node with the same id
final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
Node foundNode = null;
if ( parentNode.hasNode(nodeName) ) {
foundNode = parentNode.getNode(nodeName);
}
if ( foundNode != null ) {
// if the node is locked, someone else was quicker
// and we don't have to process this job
if ( !foundNode.isLocked() ) {
// node is already in repository, so if not finished we just use it
// otherwise it has already been processed
try {
if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) {
info.nodePath = foundNode.getPath();
}
} catch (RepositoryException re) {
// if anything goes wrong, it means that (hopefully) someone
// else is processing this node
}
}
} else {
// We now write the event into the repository
try {
final Node eventNode = this.writeEvent(event, nodeName);
info.nodePath = eventNode.getPath();
} catch (ItemExistsException iee) {
// someone else did already write this node in the meantime
// nothing to do for us
}
}
} catch (RepositoryException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during writing new job '" + nodeName + "' to repository.", re);
}
}
// if we were able to write the event into the repository
// we will queue it for processing
if ( info.nodePath != null ) {
try {
this.queue.put(info);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
}
}
}
}
}
/**
* This method runs in the background and processes the local queue.
*/
protected void runInBackground() throws RepositoryException {
this.backgroundSession = this.createSession();
this.backgroundSession.getWorkspace().getObservationManager()
.addEventListener(this,
javax.jcr.observation.Event.PROPERTY_REMOVED,
this.repositoryPath,
true,
null,
new String[] {this.getEventNodeType()},
true);
// load unprocessed jobs from repository
this.loadJobs();
while ( this.running ) {
// so let's wait/get the next job from the queue
EventInfo info = null;
try {
info = this.queue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
if ( info != null && this.running ) {
// check if the node still exists
synchronized (this.backgroundSession) {
try {
this.backgroundSession.refresh(false);
if ( this.backgroundSession.itemExists(info.nodePath) ) {
final Event event = info.event;
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
// check how we can process this job
// if parallel processing is allowed, we can just process
// if not we should check if any other job with the same topic is currently running
boolean process = parallelProcessing;
if ( !process ) {
synchronized ( this.processingMap ) {
final Boolean value = this.processingMap.get(jobTopic);
if ( value == null || !value.booleanValue() ) {
this.processingMap.put(jobTopic, Boolean.TRUE);
process = true;
}
}
}
if ( process ) {
boolean unlock = true;
try {
final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
if ( !eventNode.isLocked() ) {
// lock node
try {
eventNode.lock(false, true);
} catch (RepositoryException re) {
// lock failed which means that the node is locked by someone else, so we don't have to requeue
process = false;
}
if ( process ) {
unlock = false;
this.processJob(info.event, eventNode);
}
}
} catch (RepositoryException e) {
// ignore
this.ignoreException(e);
} finally {
if ( unlock && !parallelProcessing ) {
synchronized ( this.processingMap ) {
this.processingMap.put(jobTopic, Boolean.FALSE);
}
}
}
} else {
try {
// check if the node is in processing or already finished
final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
if ( !eventNode.isLocked() ) {
try {
this.queue.put(info);
} catch (InterruptedException e) {
// ignore
this.ignoreException(e);
}
// wait time before we restart the cycle, if there is only one job in the queue!
if ( this.queue.size() == 1 ) {
try {
Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
// ignore
this.ignoreException(e);
}
}
}
} catch (RepositoryException e) {
// ignore
this.ignoreException(e);
}
}
}
} catch (RepositoryException re) {
this.ignoreException(re);
}
}
}
}
}
/**
* @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getContainerNodeType()
*/
protected String getContainerNodeType() {
return EventHelper.JOBS_NODE_TYPE;
}
/**
* @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getEventNodeType()
*/
protected String getEventNodeType() {
return EventHelper.JOB_NODE_TYPE;
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
public void handleEvent(final Event event) {
// we ignore remote job events
if ( EventUtil.isLocal(event) ) {
// check for bundle event
if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
// job event
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
// job topic must be set, otherwise we ignore this event!
if ( jobTopic != null ) {
// queue the event in order to respond quickly
try {
this.writeQueue.put(event);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
}
} else {
this.logger.warn("Event does not contain job topic: {}", event);
}
} else {
// bundle event started or updated
boolean doIt = false;
synchronized ( this.unloadedJobs ) {
if ( this.unloadedJobs.size() > 0 ) {
doIt = true;
}
}
if ( doIt ) {
final Runnable t = new Runnable() {
public void run() {
synchronized (unloadedJobs) {
Session s = null;
final Set<String> newUnloadedJobs = new HashSet<String>();
newUnloadedJobs.addAll(unloadedJobs);
try {
s = createSession();
for(String path : unloadedJobs ) {
newUnloadedJobs.remove(path);
try {
if ( s.itemExists(path) ) {
final Node eventNode = (Node) s.getItem(path);
if ( !eventNode.isLocked() ) {
try {
final EventInfo info = new EventInfo();
info.event = readEvent(eventNode);
info.nodePath = path;
try {
queue.put(info);
} catch (InterruptedException e) {
// we ignore this exception as this should never occur
ignoreException(e);
}
} catch (ClassNotFoundException cnfe) {
newUnloadedJobs.add(path);
ignoreException(cnfe);
}
}
}
} catch (RepositoryException re) {
// we ignore this and readd
newUnloadedJobs.add(path);
ignoreException(re);
}
}
} catch (RepositoryException re) {
// unable to create session, so we try it again next time
ignoreException(re);
} finally {
if ( s != null ) {
s.logout();
}
unloadedJobs.clear();
unloadedJobs.addAll(newUnloadedJobs);
}
}
}
};
this.threadPool.execute(t);
}
}
}
}
public static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!¤$%&()=[]?";
public static final char REPLACEMENT_CHAR = '_';
public static String filter(final String nodeName) {
final StringBuffer sb = new StringBuffer();
char lastAdded = 0;
for(int i=0; i < nodeName.length(); i++) {
final char c = nodeName.charAt(i);
char toAdd = c;
if (ALLOWED_CHARS.indexOf(c) < 0) {
if (lastAdded == REPLACEMENT_CHAR) {
// do not add several _ in a row
continue;
}
toAdd = REPLACEMENT_CHAR;
} else if(i == 0 && Character.isDigit(c)) {
sb.append(REPLACEMENT_CHAR);
}
sb.append(toAdd);
lastAdded = toAdd;
}
if (sb.length()==0) {
sb.append(REPLACEMENT_CHAR);
}
return sb.toString();
}
/**
* Create a unique node name for the job.
*/
protected String getNodeName(Event event) {
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
final String name;
if ( jobId != null ) {
final String jobTopic = ((String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
name = jobTopic + " " + jobId;
} else {
name = "Job " + UUID.randomUUID().toString();
}
return filter(name);
}
/**
* Process a job and unlock the node in the repository.
* @param event The original event.
* @param eventNode The node in the repository where the job is stored.
*/
protected void processJob(Event event, Node eventNode) {
final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
final Event jobEvent = this.getJobEvent(event, eventNode.getPath());
eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
eventNode.save();
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
localEA.sendEvent(jobEvent);
// do not unlock if sending was successful
unlock = false;
} else {
this.logger.error("Job event can't be sent as no event admin is available.");
}
} catch (RepositoryException re) {
// if an exception occurs, we just log
this.logger.error("Exception during job processing.", re);
} finally {
if ( unlock ) {
if ( !parallelProcessing ) {
synchronized ( this.processingMap ) {
this.processingMap.put(jobTopic, Boolean.FALSE);
}
}
// unlock node
try {
eventNode.unlock();
} catch (RepositoryException e) {
// if unlock fails, we silently ignore this
this.ignoreException(e);
}
}
}
}
/**
* Create the job event.
* @param e
* @return
*/
protected Event getJobEvent(Event e, String nodePath) {
final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
final Dictionary<String, Object> properties = new Hashtable<String, Object>();
final String[] propertyNames = e.getPropertyNames();
for(int i=0; i<propertyNames.length; i++) {
properties.put(propertyNames[i], e.getProperty(propertyNames[i]));
}
// put properties for finished job callback
properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
return new Event(eventTopic, properties);
}
/**
* @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
*/
protected void addNodeProperties(Node eventNode, Event event)
throws RepositoryException {
super.addNodeProperties(eventNode, event);
eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
if ( jobId != null ) {
eventNode.setProperty(EventHelper.NODE_PROPERTY_JOBID, jobId);
}
}
/**
* @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
*/
public void onEvent(EventIterator iter) {
// we create an own session here
Session s = null;
try {
s = this.createSession();
while ( iter.hasNext() ) {
final javax.jcr.observation.Event event = iter.nextEvent();
if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
|| event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
try {
final String propPath = event.getPath();
int pos = propPath.lastIndexOf('/');
final String nodePath = propPath.substring(0, pos);
final String propertyName = propPath.substring(pos+1);
// we are only interested in unlocks
if ( "jcr:lockOwner".equals(propertyName) ) {
boolean doNotProcess = false;
synchronized ( this.deletedJobs ) {
doNotProcess = this.deletedJobs.remove(nodePath);
}
if ( !doNotProcess ) {
final Node eventNode = (Node) s.getItem(nodePath);
if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
try {
final EventInfo info = new EventInfo();
info.event = this.readEvent(eventNode);
info.nodePath = nodePath;
try {
this.queue.put(info);
} catch (InterruptedException e) {
// we ignore this exception as this should never occur
this.ignoreException(e);
}
} catch (ClassNotFoundException cnfe) {
// store path for lazy loading
synchronized ( this.unloadedJobs ) {
this.unloadedJobs.add(nodePath);
}
this.ignoreException(cnfe);
}
}
}
}
} catch (RepositoryException re) {
this.logger.error("Exception during jcr event processing.", re);
}
}
}
} catch (RepositoryException re) {
this.logger.error("Unable to create a session.", re);
} finally {
if ( s != null ) {
s.logout();
}
}
}
/**
* Load all active jobs from the repository.
* @throws RepositoryException
*/
protected void loadJobs() {
try {
final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
buffer.append(this.getEventNodeType());
buffer.append(")");
final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
final NodeIterator result = q.execute().getNodes();
while ( result.hasNext() ) {
final Node eventNode = result.nextNode();
if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
final String nodePath = eventNode.getPath();
try {
final Event event = this.readEvent(eventNode);
final EventInfo info = new EventInfo();
info.event = event;
info.nodePath = nodePath;
try {
this.queue.put(info);
} catch (InterruptedException e) {
// we ignore this exception as this should never occur
this.ignoreException(e);
}
} catch (ClassNotFoundException cnfe) {
// store path for lazy loading
synchronized ( this.unloadedJobs ) {
this.unloadedJobs.add(nodePath);
}
this.ignoreException(cnfe);
} catch (RepositoryException re) {
this.logger.error("Unable to load stored job from " + nodePath, re);
}
}
}
} catch (RepositoryException re) {
this.logger.error("Exception during initial loading of stored jobs.", re);
}
}
/**
* @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
*/
public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
boolean reschedule = shouldReschedule;
if ( shouldReschedule ) {
// check if we exceeded the number of retries
int retries = this.maxJobRetries;
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
}
int retryCount = 0;
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
}
retryCount++;
if ( retryCount > retries ) {
reschedule = false;
}
// update event with retry count
final Dictionary<String, Object> newProperties;
// create a new dictionary
newProperties = new Hashtable<String, Object>();
final String[] names = job.getPropertyNames();
for(int i=0; i<names.length; i++ ) {
newProperties.put(names[i], job.getProperty(names[i]));
}
newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
job = new Event(job.getTopic(), newProperties);
}
final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
// we have to use the same session for unlocking that we used for locking!
synchronized ( this.backgroundSession ) {
try {
this.backgroundSession.refresh(false);
final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
boolean unlock = true;
try {
if ( !reschedule ) {
synchronized ( this.deletedJobs ) {
this.deletedJobs.add(eventNodePath);
}
// unlock node
try {
eventNode.unlock();
} catch (RepositoryException e) {
// if unlock fails, we silently ignore this
this.ignoreException(e);
}
unlock = false;
final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
if ( jobId == null ) {
// remove node from repository if no job id set
final Node parentNode = eventNode.getParent();
eventNode.remove();
parentNode.save();
} else {
eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
eventNode.save();
}
}
} catch (RepositoryException re) {
// if an exception occurs, we just log
this.logger.error("Exception during job finishing.", re);
} finally {
if ( !parallelProcessing) {
final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
synchronized ( this.processingMap ) {
this.processingMap.put(jobTopic, Boolean.FALSE);
}
}
if ( unlock ) {
synchronized ( this.deletedJobs ) {
this.deletedJobs.add(eventNodePath);
}
// unlock node
try {
eventNode.unlock();
} catch (RepositoryException e) {
// if unlock fails, we silently ignore this
this.ignoreException(e);
}
}
}
if ( reschedule ) {
final EventInfo info = new EventInfo();
try {
info.event = job;
info.nodePath = eventNode.getPath();
} catch (RepositoryException e) {
// this should never happen
this.ignoreException(e);
}
// delay rescheduling?
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
final Runnable t = new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
// this should never happen
ignoreException(e);
}
try {
queue.put(info);
} catch (InterruptedException e) {
// this should never happen
ignoreException(e);
}
}
};
this.threadPool.execute(t);
} else {
// put directly into queue
try {
this.queue.put(info);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
}
}
}
if ( !shouldReschedule ) {
return true;
}
return reschedule;
} catch (RepositoryException re) {
this.logger.error("Unable to create new session.", re);
return false;
}
}
}
/**
* @see org.apache.sling.event.EventUtil.JobStatusNotifier#execute(java.lang.Runnable)
*/
public void execute(Runnable job) {
this.threadPool.execute(job);
}
/**
* Search for active nodes
* @param topic
* @return
* @throws RepositoryException
*/
protected Collection<Event> queryCurrentJobs(String topic, boolean locked) {
// we create a new session
Session s = null;
final List<Event> jobs = new ArrayList<Event>();
try {
s = this.createSession();
final QueryManager qManager = s.getWorkspace().getQueryManager();
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
buffer.append(this.getEventNodeType());
buffer.append(")");
if ( topic != null ) {
buffer.append(" [");
buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
buffer.append(" = '");
buffer.append(topic);
buffer.append("'");
}
if ( locked ) {
buffer.append(" and ");
buffer.append("jcr:lockOwner");
}
buffer.append("]");
final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
final NodeIterator iter = q.execute().getNodes();
while ( iter.hasNext() ) {
final Node eventNode = iter.nextNode();
try {
final Event event = this.readEvent(eventNode);
jobs.add(event);
} catch (ClassNotFoundException cnfe) {
// in the case of a class not found exception we just ignore the exception
this.ignoreException(cnfe);
}
}
} catch (RepositoryException e) {
// in the case of an error, we return an empty list
this.ignoreException(e);
} finally {
if ( s != null) {
s.logout();
}
}
return jobs;
}
/**
* @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
*/
public Collection<Event> getCurrentJobs(String topic) {
return this.queryCurrentJobs(topic, true);
}
/**
* @see org.apache.sling.event.JobStatusProvider#scheduledJobs(java.lang.String)
*/
public Collection<Event> scheduledJobs(String topic) {
return this.queryCurrentJobs(topic, false);
}
}