blob: 62dc55a3c68303eb9019b6a10e813fd468ab4ac2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.felix.scr.annotations.Services;
import org.apache.sling.commons.osgi.OsgiUtil;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EnvironmentComponent;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An event handler for special job events.
*
* We schedule this event handler to run in the background
* and clean up obsolete queues.
*
*/
@Component(label="%job.events.name",
description="%job.events.description",
metatype=true,immediate=true)
@Services({
@Service(value=Runnable.class),
@Service(value=JobManager.class),
@Service(value=EventHandler.class)
})
@Properties({
@Property(name="scheduler.period", longValue=60, propertyPrivate=true),
@Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
@Property(name=ConfigurationConstants.PROP_PRIORITY,
value=ConfigurationConstants.DEFAULT_PRIORITY,
options={@PropertyOption(name="NORM",value="Norm"),
@PropertyOption(name="MIN",value="Min"),
@PropertyOption(name="MAX",value="Max")}),
@Property(name=ConfigurationConstants.PROP_RETRIES,
intValue=ConfigurationConstants.DEFAULT_RETRIES),
@Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
@Property(name="event.topics",propertyPrivate=true,
value={"org/apache/sling/event/notification/job/*"})
})
public class DefaultJobManager
extends StatisticsImpl
implements Runnable, JobManager, EventHandler {
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** The environment component. */
@Reference
private EnvironmentComponent environment;
/** The configuration manager. */
@Reference
private QueueConfigurationManager configManager;
/** The scheduler service. */
@Reference
private Scheduler scheduler;
/** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
private final Object queuesLock = new Object();
/** All active queues. */
private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
/** Main configuration. */
private InternalQueueConfiguration mainConfiguration;
/** Current statistics. */
private final StatisticsImpl baseStatistics = new StatisticsImpl();
/** Last update for current statistics. */
private long lastUpdatedStatistics;
/** All existing events. */
private final Map<String, JobEvent> allEvents = new HashMap<String, JobEvent>();
/** All existing events by topic. */
private final Map<String, List<JobEvent>> allEventsByTopic = new HashMap<String, List<JobEvent>>();
/** Statistics per topic. */
private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
private static final boolean DEFAULT_ENABLED = true;
@Property(boolValue=DEFAULT_ENABLED)
private static final String PROP_ENABLED = "jobmanager.enabled";
private boolean enabled = DEFAULT_ENABLED;
/** We count the scheduler runs. */
private long schedulerRuns;
/**
* Activate this component.
* @param props Configuration properties
*/
@Activate
protected void activate(final Map<String, Object> props) {
this.update(props);
logger.info("Apache Sling Job Event Handler started on instance {}", Environment.APPLICATION_ID);
}
/**
* Configure this component.
* @param props Configuration properties
*/
@Modified
protected void update(final Map<String, Object> props) {
// create a new dictionary with the missing info and do some sanety puts
final Map<String, Object> queueProps = new HashMap<String, Object>(props);
queueProps.remove(ConfigurationConstants.PROP_APP_IDS);
queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
queueProps.put(ConfigurationConstants.PROP_RUN_LOCAL, false);
queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
// check max parallel - this should never be lower than 2!
final int maxParallel = OsgiUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL),
ConfigurationConstants.DEFAULT_MAX_PARALLEL);
if ( maxParallel < 2 ) {
this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2",
maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL);
queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2);
}
this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
final boolean oldEnabled = this.enabled;
this.enabled = OsgiUtil.toBoolean(props.get(PROP_ENABLED), DEFAULT_ENABLED);
// if we have been disabled before and now get enabled, restart to get processing going
if ( this.enabled != oldEnabled && this.enabled ) {
this.restart();
}
}
/**
* Dectivate this component.
*/
@Deactivate
protected void deactivate() {
final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
while ( i.hasNext() ) {
final AbstractJobQueue jbq = i.next();
jbq.close();
}
this.queues.clear();
logger.info("Apache Sling Job Event Handler stopped on instance {}", Environment.APPLICATION_ID);
}
/**
* This method is invoked periodically by the scheduler.
* It searches for idle queues and stops them after a timeout. If a queue
* is idle for two consecutive clean up calls, it is removed.
* @see java.lang.Runnable#run()
*/
private void cleanup() {
logger.debug("cleanup: Starting #{}", this.schedulerRuns + 1);
// check for unprocessed jobs first
for(final AbstractJobQueue jbq : this.queues.values() ) {
jbq.checkForUnprocessedJobs();
}
// we only do a full clean up on every fifth run
this.schedulerRuns++;
final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
if ( doFullCleanUp ) {
// check for idle queue
logger.debug("cleanup: doing full cleanup");
// we synchronize to avoid creating a queue which is about to be removed during cleanup
synchronized ( queuesLock ) {
final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
while ( i.hasNext() ) {
final Map.Entry<String, AbstractJobQueue> current = i.next();
final AbstractJobQueue jbq = current.getValue();
if ( jbq.isMarkedForRemoval() ) {
logger.debug("cleanup: Removing idle Job Queue {}", jbq);
// close
jbq.close();
// copy statistics
this.baseStatistics.add(jbq);
// remove
i.remove();
} else {
// mark to be removed during next cycle
jbq.markForRemoval();
}
}
}
}
// we do a sanity check every 12th run
final boolean doSanityCheck = (schedulerRuns % 12 == 0);
if ( doSanityCheck ) {
logger.debug("cleanup: running sanity check");
final List<JobEvent> removedEvents = new ArrayList<JobEvent>();
final Map<String, JobEvent> currentEvents;
synchronized (this.allEvents) {
currentEvents = new HashMap<String, JobEvent>(this.allEvents);
}
for (final Map.Entry<String, JobEvent> entry : currentEvents.entrySet()) {
final JobEvent job = entry.getValue();
if (!job.isAlive()) {
synchronized (this.allEvents) {
logger.debug("cleanup: Removing dead job {}", job);
this.allEvents.remove(entry.getKey());
}
removedEvents.add(job);
}
}
for(final JobEvent removedJob : removedEvents) {
final String topic = (String)removedJob.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
final List<JobEvent> l;
synchronized ( this.allEventsByTopic ) {
l = this.allEventsByTopic.get(topic);
}
if ( l != null ) {
synchronized ( l ) {
l.remove(removedJob);
}
}
}
}
}
/**
* Process a new job event.
* This method first searches the corresponding queue - if such a queue
* does not exist yet, it is created and started.
* @param event The job event
*/
public void process(final JobEvent event) {
// are we disabled?
if ( !this.enabled ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Job manager is disabled. Ignoring job {}", EventUtil.toString(event.event));
}
return;
}
// get the queue configuration
InternalQueueConfiguration config = configManager.getQueueConfiguration(event);
// if no queue config is found, we either create a new queue or use the main queue
if ( config == null ) {
final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
if ( customQueueName != null ) {
synchronized ( queuesLock ) {
final AbstractJobQueue queue = this.queues.get(customQueueName);
if ( queue != null ) {
config = queue.getConfiguration();
} else {
config = new InternalQueueConfiguration(event.event);
}
event.queueName = customQueueName;
}
} else {
config = this.mainConfiguration;
event.queueName = this.mainConfiguration.getName();
}
}
// get the queue name
final String queueName = event.queueName;
if ( config.isSkipped(event) ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Ignoring job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
}
return;
}
// drop?
if ( config.getType() == QueueConfiguration.Type.DROP ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
}
Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, event.event, null);
event.finished();
return;
}
// get or create queue
AbstractJobQueue queue = null;
// we synchronize to avoid creating a queue which is about to be removed during cleanup
synchronized ( queuesLock ) {
queue = this.queues.get(queueName);
// check for reconfiguration, we really do an identity check here(!)
if ( queue != null && queue.getConfiguration() != config ) {
this.outdateQueue(queue);
// we use a new queue with the configuration
queue = null;
}
if ( queue == null ) {
if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
queue = new OrderedJobQueue(queueName, config, this.environment);
} else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler);
} else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler);
}
if ( queue == null ) {
// this is just a sanety check, actually we can never get here
logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueName, EventUtil.toString(event.event));
return;
}
queues.put(queueName, queue);
queue.start();
}
}
// and put event
queue.process(event);
}
/**
* This method is invoked periodically by the scheduler.
* @see java.lang.Runnable#run()
*/
public void run() {
this.cleanup();
}
/**
* Return our internal statistics object.
* We recalculate this every 1.5sec (if requested)
*
* @see org.apache.sling.event.jobs.JobManager#getStatistics()
*/
public synchronized Statistics getStatistics() {
final long now = System.currentTimeMillis();
if ( this.lastUpdatedStatistics + 1500 < now ) {
this.copyFrom(this.baseStatistics);
for(final AbstractJobQueue jq : this.queues.values() ) {
this.add(jq);
}
}
return this;
}
/**
* @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
*/
public Queue getQueue(final String name) {
return this.queues.get(name);
}
/**
* @see org.apache.sling.event.jobs.JobManager#getQueues()
*/
public Iterable<Queue> getQueues() {
final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
return new Iterable<Queue>() {
public Iterator<Queue> iterator() {
return new Iterator<Queue>() {
public boolean hasNext() {
return jqI.hasNext();
}
public Queue next() {
return jqI.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
public InternalQueueConfiguration getMainQueueConfiguration() {
return this.mainConfiguration;
}
/**
* Add a job to all jobs.
*/
public void notifyAddJob(final JobEvent job) {
final String key = job.uniqueId;
final String topic = (String)job.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
final JobEvent oldJob;
synchronized ( this.allEvents ) {
oldJob = this.allEvents.put(key, job);
}
List<JobEvent> l;
synchronized ( this.allEventsByTopic ) {
l = this.allEventsByTopic.get(topic);
if ( l == null ) {
l = new ArrayList<JobEvent>();
this.allEventsByTopic.put(topic, l);
}
}
synchronized ( l ) {
if ( oldJob != null ) {
l.remove(oldJob);
}
l.add(job);
}
}
/**
* Remove a job from all jobs.
*/
public void notifyRemoveJob(final String key) {
final JobEvent oldJob;
synchronized ( this.allEvents ) {
oldJob = this.allEvents.remove(key);
}
if ( oldJob != null ) {
final String topic = (String)oldJob.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
final List<JobEvent> l;
synchronized ( this.allEventsByTopic ) {
l = this.allEventsByTopic.get(topic);
}
if ( l != null ) {
synchronized ( l ) {
l.remove(oldJob);
}
}
}
}
/**
* Job started
*/
public void notifyActiveJob(final String key) {
final JobEvent job;
synchronized ( this.allEvents ) {
job = this.allEvents.get(key);
}
if ( job != null ) {
job.started = 1;
}
}
/**
* Job started
*/
public void notifyRescheduleJob(final String key) {
final JobEvent job;
synchronized ( this.allEvents ) {
job = this.allEvents.get(key);
}
if ( job != null ) {
job.started = -1;
}
}
/**
* Check the requested job type
*/
private boolean checkType(final QueryType type, final JobEvent event) {
if ( type == QueryType.ALL ) {
return true;
}
if ( type == QueryType.ACTIVE && event.started == 1 ) {
return true;
}
if ( type == QueryType.QUEUED && event.started == -1 ) {
return true;
}
return false;
}
private enum Operation {
LESS,
LESS_OR_EQUALS,
EQUALS,
GREATER_OR_EQUALS,
GREATER
}
/**
* Check if the job matches the template
*/
private boolean match(final JobEvent job, final Map<String, Object> template) {
if ( template != null ) {
for(final Map.Entry<String, Object> current : template.entrySet()) {
final String key = current.getKey();
final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
final String propName;
final Operation op;
if ( firstChar == '=' ) {
propName = key.substring(1);
op = Operation.EQUALS;
} else if ( firstChar == '<' ) {
final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
if ( secondChar == '=' ) {
op = Operation.LESS_OR_EQUALS;
propName = key.substring(2);
} else {
op = Operation.LESS;
propName = key.substring(1);
}
} else if ( firstChar == '>' ) {
final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
if ( secondChar == '=' ) {
op = Operation.GREATER_OR_EQUALS;
propName = key.substring(2);
} else {
op = Operation.GREATER;
propName = key.substring(1);
}
} else {
propName = key;
op = Operation.EQUALS;
}
final Object value = current.getValue();
if ( op == Operation.EQUALS ) {
if ( !value.equals(job.event.getProperty(propName)) ) {
return false;
}
} else {
if ( value instanceof Comparable ) {
@SuppressWarnings({ "unchecked", "rawtypes" })
final int result = ((Comparable)value).compareTo(job.event.getProperty(propName));
if ( op == Operation.LESS && result != -1 ) {
return false;
} else if ( op == Operation.LESS_OR_EQUALS && result == 1 ) {
return false;
} else if ( op == Operation.GREATER_OR_EQUALS && result == -1 ) {
return false;
} else if ( op == Operation.GREATER && result != 1 ) {
return false;
}
} else {
// if the value is not comparable we simply don't match
return false;
}
}
}
}
return true;
}
private boolean queryCollection(
final List<Event> result,
final QueryType type,
final Collection<JobEvent> collection,
final long limit,
final Map<String, Object>... filterProps) {
synchronized ( collection ) {
final Iterator<JobEvent> iter = collection.iterator();
while ( iter.hasNext() ) {
final JobEvent job = iter.next();
boolean add = checkType(type, job);
if ( add && filterProps != null && filterProps.length != 0 ) {
add = false;
for (Map<String,Object> template : filterProps) {
add = this.match(job, template);
if ( add ) {
break;
}
}
}
if ( add ) {
result.add(job.event);
if ( limit > 0 && result.size() == limit ) {
return true;
}
}
}
}
return false;
}
/**
* @see org.apache.sling.event.jobs.JobManager#queryJobs(QueryType, java.lang.String, java.util.Map...)
*/
public JobsIterator queryJobs(final QueryType type,
final String topic,
final Map<String, Object>... filterProps) {
return this.queryJobs(type, topic, -1, filterProps);
}
/**
* @see org.apache.sling.event.jobs.JobManager#queryJobs(QueryType, java.lang.String, long, java.util.Map...)
*/
public JobsIterator queryJobs(final QueryType type,
final String topic,
final long limit,
final Map<String, Object>... filterProps) {
final List<Event> result = new ArrayList<Event>();
if ( topic != null ) {
final List<JobEvent> l;
synchronized ( this.allEventsByTopic ) {
l = this.allEventsByTopic.get(topic);
}
if ( l != null ) {
queryCollection(result, type, l, limit, filterProps);
}
} else {
final Set<Collection<JobEvent>> topics;
synchronized ( this.allEventsByTopic ) {
topics = new HashSet<Collection<JobEvent>>(this.allEventsByTopic.values());
}
boolean done = false;
final Iterator<Collection<JobEvent>> i = topics.iterator();
while ( !done && i.hasNext() ) {
final Collection<JobEvent> l = i.next();
done = queryCollection(result, type, l, limit, filterProps);
}
}
return new JobsIteratorImpl(result);
}
/**
* @see org.apache.sling.event.jobs.JobManager#findJob(java.lang.String, java.util.Map)
*/
public Event findJob(final String topic, final Map<String, Object> template) {
Event result = null;
if ( topic != null ) {
final List<JobEvent> l;
synchronized ( this.allEventsByTopic ) {
l = this.allEventsByTopic.get(topic);
}
if ( l != null ) {
synchronized ( l ) {
final Iterator<JobEvent> iter = l.iterator();
while ( result == null && iter.hasNext() ) {
final JobEvent job = iter.next();
if ( match(job, template) ) {
result = job.event;
}
}
}
}
}
return result;
}
/**
* @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
*/
public boolean removeJob(final String jobId) {
final JobEvent job;
synchronized ( this.allEvents ) {
job = this.allEvents.get(jobId);
}
boolean result = true;
if ( job != null ) {
if ( job.started != 1 ) {
result = job.remove();
} else {
result = false;
}
}
return result;
}
/**
* @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
*/
public void forceRemoveJob(final String jobId) {
while ( !this.removeJob(jobId) ) {
// instead of using complicated syncs, waits and notifies we simply poll
try {
Thread.sleep(80);
} catch (final InterruptedException ignore) {
this.ignoreException(ignore);
}
}
}
/**
* Helper method which just logs the exception in debug mode.
* @param e
*/
private void ignoreException(final Exception e) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Ignored exception " + e.getMessage(), e);
}
}
/**
* @see org.apache.sling.event.impl.jobs.StatisticsImpl#reset()
* Reset this statistics and all queues.
*/
public synchronized void reset() {
this.baseStatistics.reset();
for(final AbstractJobQueue jq : this.queues.values() ) {
jq.reset();
}
this.topicStatistics.clear();
this.lastUpdatedStatistics = 0;
}
/**
* @see org.apache.sling.event.jobs.JobManager#getTopicStatistics()
*/
public Iterable<TopicStatistics> getTopicStatistics() {
return topicStatistics.values();
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
public void handleEvent(final Event event) {
final Event job = (Event)event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
if ( job != null ) {
final String topic = (String)job.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
if ( topic != null ) { // this is just a sanity check
TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
if ( ts == null ) {
this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
}
if ( event.getTopic().equals(JobUtil.TOPIC_JOB_CANCELLED) ) {
ts.addCancelled();
} else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FAILED) ) {
ts.addFailed();
} else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FINISHED) ) {
final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
ts.addFinished(time == null ? -1 : time);
} else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_STARTED) ) {
final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
ts.addActivated(time == null ? -1 : time);
}
}
}
}
private void outdateQueue(final AbstractJobQueue queue) {
// remove the queue with the old name
this.queues.remove(queue.getName());
// check if we can close or have to rename
queue.markForRemoval();
if ( queue.isMarkedForRemoval() ) {
// close
queue.close();
// copy statistics
this.baseStatistics.add(queue);
} else {
// notify queue
queue.rename(queue.getName() + "<outdated>(" + queue.hashCode() + ")");
// readd with new name
this.queues.put(queue.getName(), queue);
}
}
/**
* @see org.apache.sling.event.jobs.JobManager#restart()
*/
public void restart() {
// let's rename/close all queues first
synchronized ( queuesLock ) {
final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
for(final AbstractJobQueue queue : queues ) {
this.outdateQueue(queue);
}
}
// reset statistics
this.reset();
// restart all jobs - we first copy all of them
final List<JobEvent> jobs;
synchronized ( this.allEvents ) {
jobs = new ArrayList<JobEvent>(this.allEvents.values());
this.allEvents.clear();
}
synchronized ( this.allEventsByTopic ) {
this.allEventsByTopic.clear();
}
for(final JobEvent job : jobs) {
job.restart();
}
}
/**
* @see org.apache.sling.event.jobs.JobManager#isJobProcessingEnabled()
*/
public boolean isJobProcessingEnabled() {
return this.enabled;
}
}