blob: b76fda484f6b556f48803fa3ab24232eba427a3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.QuerySyntaxException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.observation.ResourceChange;
import org.apache.sling.api.resource.observation.ResourceChangeListener;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Job.JobState;
import org.apache.sling.event.jobs.JobBuilder;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.ScheduledJobInfo;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the job manager.
*/
@Component(immediate=true)
@Service(value={JobManager.class, EventHandler.class, Runnable.class})
@Properties({
@Property(name="scheduler.period", longValue=60),
@Property(name="scheduler.concurrent", boolValue=false),
@Property(name=EventConstants.EVENT_TOPIC,
value={ResourceHelper.BUNDLE_EVENT_STARTED,
ResourceHelper.BUNDLE_EVENT_UPDATED})
})
public class JobManagerImpl
implements JobManager, EventHandler, Runnable {
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private EventAdmin eventAdmin;
@Reference
private Scheduler scheduler;
@Reference
private JobConsumerManager jobConsumerManager;
@Reference
private QueuesMBean queuesMBean;
@Reference
private ThreadPoolManager threadPoolManager;
/** The job manager configuration. */
@Reference
private JobManagerConfiguration configuration;
@Reference
private StatisticsManager statisticsManager;
@Reference
private QueueManager qManager;
private volatile CleanUpTask maintenanceTask;
/** Job Scheduler. */
private org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl jobScheduler;
private volatile ServiceRegistration<ResourceChangeListener> changeListenerReg;
/**
* Activate this component.
* @param props Configuration properties
*/
@Activate
protected void activate(final BundleContext ctx, final Map<String, Object> props) throws LoginException {
this.jobScheduler = new org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl(this.configuration, this.scheduler, this);
this.maintenanceTask = new CleanUpTask(this.configuration, this.jobScheduler);
final Dictionary<String, Object> regProps = new Hashtable<>();
regProps.put(ResourceChangeListener.PATHS, this.configuration.getScheduledJobsPath(false));
regProps.put(ResourceChangeListener.CHANGES, new String[] {
ResourceChange.ChangeType.ADDED.name(),
ResourceChange.ChangeType.CHANGED.name(),
ResourceChange.ChangeType.REMOVED.name()
});
regProps.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
regProps.put(Constants.SERVICE_DESCRIPTION, "Resource change listener for scheduled jobs");
this.changeListenerReg = ctx.registerService(ResourceChangeListener.class, this.jobScheduler, regProps);
logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
}
/**
* Deactivate this component.
*/
@Deactivate
protected void deactivate() {
logger.debug("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
if ( this.changeListenerReg != null ) {
this.changeListenerReg.unregister();
this.changeListenerReg = null;
}
this.jobScheduler.deactivate();
this.maintenanceTask = null;
logger.info("Apache Sling Job Manager stopped on instance {}", Environment.APPLICATION_ID);
}
/**
* This method is invoked periodically by the scheduler.
* In the default configuration every minute
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// invoke maintenance task
final CleanUpTask task = this.maintenanceTask;
if ( task != null ) {
task.run();
}
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
@Override
public void handleEvent(final Event event) {
this.jobScheduler.handleEvent(event);
}
/**
* Return our internal statistics object.
*
* @see org.apache.sling.event.jobs.JobManager#getStatistics()
*/
@Override
public synchronized Statistics getStatistics() {
return this.statisticsManager.getGlobalStatistics();
}
/**
* @see org.apache.sling.event.jobs.JobManager#getTopicStatistics()
*/
@Override
public Iterable<TopicStatistics> getTopicStatistics() {
return this.statisticsManager.getTopicStatistics().values();
}
/**
* @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
*/
@Override
public Queue getQueue(final String name) {
return qManager.getQueue(ResourceHelper.filterQueueName(name));
}
/**
* @see org.apache.sling.event.jobs.JobManager#getQueues()
*/
@Override
public Iterable<Queue> getQueues() {
return qManager.getQueues();
}
/**
* Remove a job.
* If the job is already in the storage area, it's removed forever.
* Otherwise it's moved to the storage area.
*/
private boolean internalRemoveJobById(final String jobId, final boolean forceRemove) {
logger.debug("Trying to remove job {}", jobId);
boolean result = true;
JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Found removal job: {}", Utility.toString(job));
}
final JobImpl retryJob = (JobImpl)this.configuration.getJobFromRetryList(jobId);
if ( retryJob != null ) {
job = retryJob;
}
// currently running?
if ( !forceRemove && job.getProcessingStarted() != null ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Unable to remove job - job is started: {}", Utility.toString(job));
}
result = false;
} else {
final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath());
// if history job, simply remove - otherwise move to history!
if ( isHistoryJob ) {
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
final Resource jobResource = resolver.getResource(job.getResourcePath());
if ( jobResource != null ) {
resolver.delete(jobResource);
resolver.commit();
logger.debug("Removed job with id: {}", jobId);
} else {
logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
}
NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_REMOVED, job, null);
} catch ( final PersistenceException pe) {
logger.warn("Unable to remove job at " + job.getResourcePath(), pe);
result = false;
} finally {
resolver.close();
}
} else {
final JobHandler jh = new JobHandler(job, null, this.configuration);
jh.finished(Job.JobState.DROPPED, true, null);
}
this.configuration.getAuditLogger().debug("REMOVE OK : {}", jobId);
}
} else {
logger.debug("Job for removal does not exist (anymore): {}", jobId);
}
return result;
}
/**
* @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.util.Map)
*/
@Override
public Job addJob(String topic, Map<String, Object> properties) {
return this.addJob(topic, properties, null);
}
/**
* @see org.apache.sling.event.jobs.JobManager#getJobById(java.lang.String)
*/
@Override
public Job getJobById(final String id) {
logger.debug("Getting job by id: {}", id);
final ResourceResolver resolver = this.configuration.createResourceResolver();
final StringBuilder buf = new StringBuilder(64);
try {
buf.append("/jcr:root");
buf.append(this.configuration.getJobsBasePathWithSlash());
buf.append("/element(*,");
buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
buf.append(")[@");
buf.append(ResourceHelper.PROPERTY_JOB_ID);
buf.append(" = '");
buf.append(id);
buf.append("']");
if ( logger.isDebugEnabled() ) {
logger.debug("Exceuting query: {}", buf.toString());
}
final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
while ( result.hasNext() ) {
final Resource jobResource = result.next();
// sanity check for the path
if ( this.configuration.isJob(jobResource.getPath()) ) {
final JobImpl job = Utility.readJob(logger, jobResource);
if ( job != null ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Found job with id {} = {}", id, Utility.toString(job));
}
return job;
}
}
}
} catch (final QuerySyntaxException qse) {
logger.warn("Query syntax wrong " + buf.toString(), qse);
} finally {
resolver.close();
}
logger.debug("Job not found with id: {}", id);
return null;
}
/**
* @see org.apache.sling.event.jobs.JobManager#getJob(java.lang.String, java.util.Map)
*/
@SuppressWarnings("unchecked")
@Override
public Job getJob(final String topic, final Map<String, Object> template) {
final Iterable<Job> iter;
if ( template == null ) {
iter = this.findJobs(QueryType.ALL, topic, 1, (Map<String, Object>[])null);
} else {
iter = this.findJobs(QueryType.ALL, topic, 1, template);
}
final Iterator<Job> i = iter.iterator();
if ( i.hasNext() ) {
return i.next();
}
return null;
}
/**
* @see org.apache.sling.event.jobs.JobManager#removeJobById(java.lang.String)
*/
@Override
public boolean removeJobById(final String jobId) {
return this.internalRemoveJobById(jobId, true);
}
private enum Operation {
LESS,
LESS_OR_EQUALS,
EQUALS,
GREATER_OR_EQUALS,
GREATER
}
/**
* @see org.apache.sling.event.jobs.JobManager#findJobs(org.apache.sling.event.jobs.JobManager.QueryType, java.lang.String, long, java.util.Map[])
*/
@Override
public Collection<Job> findJobs(final QueryType type,
final String topic,
final long limit,
final Map<String, Object>... templates) {
final boolean isHistoryQuery = type == QueryType.HISTORY
|| type == QueryType.SUCCEEDED
|| type == QueryType.CANCELLED
|| type == QueryType.DROPPED
|| type == QueryType.ERROR
|| type == QueryType.GIVEN_UP
|| type == QueryType.STOPPED;
final List<Job> result = new ArrayList<Job>();
final ResourceResolver resolver = this.configuration.createResourceResolver();
final StringBuilder buf = new StringBuilder(64);
try {
buf.append("/jcr:root");
buf.append(this.configuration.getJobsBasePathWithSlash());
buf.append("/element(*,");
buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
buf.append(")[@");
buf.append(ISO9075.encode(ResourceHelper.PROPERTY_JOB_TOPIC));
if (topic != null) {
buf.append(" = '");
buf.append(topic);
buf.append("'");
}
// restricting on the type - history or unfinished
if ( isHistoryQuery ) {
buf.append(" and @");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
if ( type == QueryType.SUCCEEDED || type == QueryType.DROPPED || type == QueryType.ERROR || type == QueryType.GIVEN_UP || type == QueryType.STOPPED ) {
buf.append(" = '");
buf.append(type.name());
buf.append("'");
} else if ( type == QueryType.CANCELLED ) {
buf.append(" and (@");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(" = '");
buf.append(QueryType.DROPPED.name());
buf.append("' or @");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(" = '");
buf.append(QueryType.ERROR.name());
buf.append("' or @");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(" = '");
buf.append(QueryType.GIVEN_UP.name());
buf.append("' or @");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(" = '");
buf.append(QueryType.STOPPED.name());
buf.append("')");
}
} else {
buf.append(" and not(@");
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(")");
if ( type == QueryType.ACTIVE ) {
buf.append(" and @");
buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
} else if ( type == QueryType.QUEUED ) {
buf.append(" and not(@");
buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
buf.append(")");
}
}
if ( templates != null && templates.length > 0 ) {
int index = 0;
for (final Map<String,Object> template : templates) {
// skip empty templates
if ( template.size() == 0 ) {
continue;
}
if ( index == 0 ) {
buf.append(" and (");
} else {
buf.append(" or ");
}
buf.append('(');
final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
boolean first = true;
while ( i.hasNext() ) {
final Map.Entry<String, Object> current = i.next();
final String key = ISO9075.encode(current.getKey());
final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
final String propName;
final Operation op;
if ( firstChar == '=' ) {
propName = key.substring(1);
op = Operation.EQUALS;
} else if ( firstChar == '<' ) {
final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
if ( secondChar == '=' ) {
op = Operation.LESS_OR_EQUALS;
propName = key.substring(2);
} else {
op = Operation.LESS;
propName = key.substring(1);
}
} else if ( firstChar == '>' ) {
final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
if ( secondChar == '=' ) {
op = Operation.GREATER_OR_EQUALS;
propName = key.substring(2);
} else {
op = Operation.GREATER;
propName = key.substring(1);
}
} else {
propName = key;
op = Operation.EQUALS;
}
if ( first ) {
first = false;
buf.append('@');
} else {
buf.append(" and @");
}
buf.append(propName);
buf.append(' ');
switch ( op ) {
case EQUALS : buf.append('=');break;
case LESS : buf.append('<'); break;
case LESS_OR_EQUALS : buf.append("<="); break;
case GREATER : buf.append('>'); break;
case GREATER_OR_EQUALS : buf.append(">="); break;
}
buf.append(" '");
buf.append(current.getValue());
buf.append("'");
}
buf.append(')');
index++;
}
if ( index > 0 ) {
buf.append(')');
}
}
buf.append("] order by @");
if ( isHistoryQuery ) {
buf.append(JobImpl.PROPERTY_FINISHED_DATE);
buf.append(" descending");
} else {
buf.append(Job.PROPERTY_JOB_CREATED);
buf.append(" ascending");
}
final Iterator<Resource> iter = resolver.findResources(buf.toString(), "xpath");
long count = 0;
while ( iter.hasNext() && (limit < 1 || count < limit) ) {
final Resource jobResource = iter.next();
// sanity check for the path
if ( this.configuration.isJob(jobResource.getPath()) ) {
final JobImpl job = Utility.readJob(logger, jobResource);
if ( job != null ) {
count++;
result.add(job);
}
}
}
} catch (final QuerySyntaxException qse) {
logger.warn("Query syntax wrong " + buf.toString(), qse);
} finally {
resolver.close();
}
return result;
}
/**
* Persist the job in the resource tree
* @param jobTopic The required job topic
* @param jobName The optional job name
* @param passedJobProperties The optional job properties
* @return The persisted job or <code>null</code>.
*/
private Job addJobInternal(final String jobTopic,
final Map<String, Object> jobProperties,
final List<String> errors) {
final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(jobTopic);
final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
if ( logger.isDebugEnabled() ) {
if ( info.targetId != null ) {
logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobProperties), info.queueName, info.targetId});
} else {
logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobProperties), info.queueName);
}
}
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
final JobImpl job = this.writeJob(resolver,
jobTopic,
jobProperties,
info);
if ( info.targetId != null ) {
this.configuration.getAuditLogger().debug("ASSIGN OK {} : {}",
info.targetId, job.getId());
} else {
this.configuration.getAuditLogger().debug("UNASSIGN OK : {}",
job.getId());
}
return job;
} catch (final PersistenceException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobProperties) + "'", re);
} finally {
resolver.close();
}
if ( errors != null ) {
errors.add("Unable to persist new job.");
}
return null;
}
/**
* Write a job to the resource tree.
* @param resolver The resolver resolver
* @param event The event
* @param info The queue information (queue name etc.)
* @throws PersistenceException
*/
private JobImpl writeJob(final ResourceResolver resolver,
final String jobTopic,
final Map<String, Object> jobProperties,
final QueueInfo info)
throws PersistenceException {
final String jobId = this.configuration.getUniqueId(jobTopic);
final String path = this.configuration.getUniquePath(info.targetId, jobTopic, jobId, jobProperties);
// create properties
final Map<String, Object> properties = new HashMap<String, Object>();
if ( jobProperties != null ) {
for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
final String propName = entry.getKey();
if ( !ResourceHelper.ignoreProperty(propName) ) {
properties.put(propName, entry.getValue());
}
}
}
properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
properties.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
if ( info.targetId != null ) {
properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, info.targetId);
} else {
properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
}
// create path and resource
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
if ( logger.isDebugEnabled() ) {
logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, properties), path);
}
ResourceHelper.getOrCreateResource(resolver,
path,
properties);
// update property types - priority, add path and create job
properties.put(JobImpl.PROPERTY_RESOURCE_PATH, path);
return new JobImpl(jobTopic, jobId, properties);
}
/**
* @see org.apache.sling.event.jobs.JobManager#stopJobById(java.lang.String)
*/
@Override
public void stopJobById(final String jobId) {
this.stopJobById(jobId, true);
}
private void stopJobById(final String jobId, final boolean forward) {
final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
// get the queue configuration
final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
final JobQueueImpl queue = (JobQueueImpl)this.qManager.getQueue(queueInfo.queueName);
boolean stopped = false;
if ( queue != null ) {
stopped = queue.stopJob(job);
}
if ( forward && !stopped ) {
// mark the job as stopped
final JobHandler jh = new JobHandler(job, null, this.configuration);
jh.finished(JobState.STOPPED, true, null);
}
}
}
/**
* @see org.apache.sling.event.jobs.JobManager#createJob(java.lang.String)
*/
@Override
public JobBuilder createJob(final String topic) {
return new JobBuilderImpl(this, topic);
}
/**
* @see org.apache.sling.event.jobs.JobManager#getScheduledJobs()
*/
@Override
public Collection<ScheduledJobInfo> getScheduledJobs() {
return this.jobScheduler.getScheduledJobs(null, -1, (Map<String, Object>[])null);
}
/**
* @see org.apache.sling.event.jobs.JobManager#getScheduledJobs()
*/
@Override
public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
final long limit,
final Map<String, Object>... templates) {
return this.jobScheduler.getScheduledJobs(topic, limit, templates);
}
/**
* Internal method to add a job
*/
public Job addJob(final String topic,
final Map<String, Object> properties,
final List<String> errors) {
final String errorMessage = Utility.checkJob(topic, properties);
if ( errorMessage != null ) {
logger.warn("{}", errorMessage);
if ( errors != null ) {
errors.add(errorMessage);
}
this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
new Object[] {topic,
properties,
errorMessage});
return null;
}
final List<String> errorList = new ArrayList<String>();
Job result = this.addJobInternal(topic, properties, errorList);
if ( errors != null ) {
errors.addAll(errorList);
}
if ( result == null ) {
this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
new Object[] {topic,
properties,
errorList});
} else {
this.configuration.getAuditLogger().debug("ADD OK topic={}, properties={} : {}",
new Object[] {topic,
properties,
result.getId()});
}
return result;
}
/**
* @see org.apache.sling.event.jobs.JobManager#retryJobById(java.lang.String)
*/
@Override
public Job retryJobById(final String jobId) {
final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null && this.configuration.isStoragePath(job.getResourcePath()) ) {
this.internalRemoveJobById(jobId, true);
return this.addJob(job.getTopic(), job.getProperties());
}
return null;
}
public JobSchedulerImpl getJobScheduler() {
return this.jobScheduler;
}
}