blob: b633dc2a2b0d00a3d6a95dfaf43f571f6822f0c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs.config;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.commons.InitDelayingTopologyEventListener;
import org.apache.sling.event.impl.EnvironmentComponent;
import org.apache.sling.event.impl.jobs.tasks.CheckTopologyTask;
import org.apache.sling.event.impl.jobs.tasks.FindUnfinishedJobsTask;
import org.apache.sling.event.impl.jobs.tasks.UpgradeTask;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Configuration of the job handling
*
*/
@Component(immediate=true,
service=JobManagerConfiguration.class,
name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler",
property = {
Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
JobManagerConfiguration.PROPERTY_REPOSITORY_PATH + "=" + JobManagerConfiguration.DEFAULT_REPOSITORY_PATH,
JobManagerConfiguration.PROPERTY_SCHEDULED_JOBS_PATH + "=" + JobManagerConfiguration.DEFAULT_SCHEDULED_JOBS_PATH,
JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY + ":Long=" + JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY
})
@Designate(ocd = JobManagerConfiguration.Config.class)
public class JobManagerConfiguration {
@ObjectClassDefinition(name = "Apache Sling Job Manager",
description="This is the central service of the job handling.")
public @interface Config {
@AttributeDefinition(name = "Disable Distribution",
description="If the distribution is disabled, all jobs will be processed on the leader only! "
+ "Please use this switch with care.")
boolean job_consumermanager_disableDistribution() default false;
@AttributeDefinition(name = "Startup Delay",
description="Specify amount in seconds that job manager waits on startup before starting with job handling. "
+ "This can be used to allow enough time to restart a cluster before jobs are eventually reassigned.")
long startup_delay() default 30;
@AttributeDefinition(name = "Clean-up removed jobs period",
description = "Specify the periodic interval in minutes (default is 48h - use 0 to disable) after which " +
"removed jobs (ERROR or DROPPED) should be cleaned from the repository.")
int cleanup_period() default 2880;
}
/** Logger. */
private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
/** Audit Logger. */
private final Logger auditLogger = LoggerFactory.getLogger("org.apache.sling.event.jobs.audit");
/** Default resource path for jobs. */
public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
/** Default background load delay. */
public static final long DEFAULT_BACKGROUND_LOAD_DELAY = 10;
/** Default resource path for scheduled jobs. */
public static final String DEFAULT_SCHEDULED_JOBS_PATH = "/var/eventing/scheduled-jobs";
/** The path where all jobs are stored. */
public static final String PROPERTY_REPOSITORY_PATH = "repository.path";
/** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
public static final String PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
/** Configuration property for the scheduled jobs path. */
public static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
/** The jobs base path with a slash. */
private String jobsBasePathWithSlash;
/** The base path for assigned jobs. */
private String assignedJobsPath;
/** The base path for unassigned jobs. */
private String unassignedJobsPath;
/** The base path for assigned jobs to the current instance. */
private String localJobsPath;
/** The base path for assigned jobs to the current instance - ending with a slash. */
private String localJobsPathWithSlash;
private String previousVersionAnonPath;
private String previousVersionIdentifiedPath;
private volatile long backgroundLoadDelay;
private volatile long startupDelay;
private volatile InitDelayingTopologyEventListener startupDelayListener;
private volatile boolean disabledDistribution;
private String storedCancelledJobsPath;
private String storedSuccessfulJobsPath;
/** The resource path where scheduled jobs are stored. */
private String scheduledJobsPath;
/** The resource path where scheduled jobs are stored - ending with a slash. */
private String scheduledJobsPathWithSlash;
private volatile int historyCleanUpRemovedJobs;
/** List of topology awares. */
private final List<ConfigurationChangeListener> listeners = new ArrayList<>();
/** The environment component. */
@Reference
private EnvironmentComponent environment;
@Reference(policyOption=ReferencePolicyOption.GREEDY)
private ResourceResolverFactory resourceResolverFactory;
@Reference
private QueueConfigurationManager queueConfigManager;
@Reference(policyOption=ReferencePolicyOption.GREEDY)
private ServiceUserMapped serviceUserMapped;
/** Is this still active? */
private final AtomicBoolean active = new AtomicBoolean(false);
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
/**
* Activate this component.
* @param props Configuration properties
* @param config Configuration properties
* @throws RuntimeException If the default paths can't be created
*/
@Activate
protected void activate(final Map<String, Object> props, final Config config) {
this.update(props, config);
this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(PROPERTY_REPOSITORY_PATH),
DEFAULT_REPOSITORY_PATH) + '/';
// create initial resources
this.assignedJobsPath = this.jobsBasePathWithSlash + "assigned";
this.unassignedJobsPath = this.jobsBasePathWithSlash + "unassigned";
this.localJobsPath = this.assignedJobsPath.concat("/").concat(Environment.APPLICATION_ID);
this.localJobsPathWithSlash = this.localJobsPath.concat("/");
this.previousVersionAnonPath = this.jobsBasePathWithSlash + "anon";
this.previousVersionIdentifiedPath = this.jobsBasePathWithSlash + "identified";
this.storedCancelledJobsPath = this.jobsBasePathWithSlash + "cancelled";
this.storedSuccessfulJobsPath = this.jobsBasePathWithSlash + "finished";
this.scheduledJobsPath = PropertiesUtil.toString(props.get(PROPERTY_SCHEDULED_JOBS_PATH),
DEFAULT_SCHEDULED_JOBS_PATH);
this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
this.historyCleanUpRemovedJobs = config.cleanup_period();
// create initial resources
final ResourceResolver resolver = this.createResourceResolver();
try {
ResourceHelper.getOrCreateBasePath(resolver, this.getLocalJobsPath());
ResourceHelper.getOrCreateBasePath(resolver, this.getUnassignedJobsPath());
} catch ( final PersistenceException pe ) {
logger.error("Unable to create default paths: " + pe.getMessage(), pe);
throw new RuntimeException(pe);
} finally {
resolver.close();
}
this.active.set(true);
// SLING-5560 : use an InitDelayingTopologyEventListener
if (this.startupDelay > 0) {
logger.debug("activate: job manager will start in {} sec. ({})", this.startupDelay, config.startup_delay());
this.startupDelayListener = new InitDelayingTopologyEventListener(startupDelay, new TopologyEventListener() {
@Override
public void handleTopologyEvent(TopologyEvent event) {
doHandleTopologyEvent(event);
}
}, logger);
} else {
logger.debug("activate: job manager will start without delay. ({}:{})", config.startup_delay(), this.startupDelay);
}
}
/**
* Update with a new configuration
*/
@Modified
protected void update(final Map<String, Object> props, final Config config) {
this.disabledDistribution = config.job_consumermanager_disableDistribution();
this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
// SLING-5560: note that currently you can't change the startupDelay to have
// an immediate effect - it will only have an effect on next activation.
// (as 'startup delay runnable' is already scheduled in activate)
this.startupDelay = config.startup_delay();
}
/**
* Deactivate
*/
@Deactivate
protected void deactivate() {
this.active.set(false);
if ( this.startupDelayListener != null) {
this.startupDelayListener.dispose();
this.startupDelayListener = null;
}
this.stopProcessing();
}
public int getHistoryCleanUpRemovedJobs() {
return this.historyCleanUpRemovedJobs;
}
/**
* Is this component still active?
* @return Active?
*/
public boolean isActive() {
return this.active.get();
}
/**
* Create a new resource resolver for reading and writing the resource tree.
* The resolver needs to be closed by the client.
* @return A resource resolver or {@code null} if the component is already deactivated.
* @throws RuntimeException if the resolver can't be created.
*/
public ResourceResolver createResourceResolver() {
ResourceResolver resolver = null;
final ResourceResolverFactory factory = this.resourceResolverFactory;
if ( factory != null ) {
try {
resolver = this.resourceResolverFactory.getServiceResourceResolver(null);
} catch ( final LoginException le) {
logger.error("Unable to create new resource resolver: " + le.getMessage(), le);
throw new RuntimeException(le);
}
}
return resolver;
}
/**
* Get the current topology capabilities.
* @return The capabilities or {@code null}
*/
public TopologyCapabilities getTopologyCapabilities() {
return this.topologyCapabilities;
}
public QueueConfigurationManager getQueueConfigurationManager() {
return this.queueConfigManager;
}
/**
* Get main logger.
* @return The main logger.
*/
public Logger getMainLogger() {
return this.logger;
}
/**
* Get the resource path for all assigned jobs.
* @return The path - does not end with a slash.
*/
public String getAssginedJobsPath() {
return this.assignedJobsPath;
}
/**
* Get the resource path for all unassigned jobs.
* @return The path - does not end with a slash.
*/
public String getUnassignedJobsPath() {
return this.unassignedJobsPath;
}
/**
* Get the resource path for all jobs assigned to the current instance
* @return The path - does not end with a slash
*/
public String getLocalJobsPath() {
return this.localJobsPath;
}
/** Counter for jobs without an id. */
private final AtomicLong jobCounter = new AtomicLong(0);
/**
* Create a unique job path (folder and name) for the job.
*/
public String getUniquePath(final String targetId,
final String topic,
final String jobId,
final Map<String, Object> jobProperties) {
final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( targetId != null ) {
sb.append(this.assignedJobsPath);
sb.append('/');
sb.append(targetId);
} else {
sb.append(this.unassignedJobsPath);
}
sb.append('/');
sb.append(topicName);
sb.append('/');
sb.append(jobId);
return sb.toString();
}
/**
* Get the unique job id
*/
public String getUniqueId(final String jobTopic) {
final Calendar now = Calendar.getInstance();
final StringBuilder sb = new StringBuilder();
sb.append(now.get(Calendar.YEAR));
sb.append('/');
sb.append(now.get(Calendar.MONTH) + 1);
sb.append('/');
sb.append(now.get(Calendar.DAY_OF_MONTH));
sb.append('/');
sb.append(now.get(Calendar.HOUR_OF_DAY));
sb.append('/');
sb.append(now.get(Calendar.MINUTE));
sb.append('/');
sb.append(Environment.APPLICATION_ID);
sb.append('_');
sb.append(jobCounter.getAndIncrement());
return sb.toString();
}
public boolean isLocalJob(final String jobPath) {
return jobPath != null && jobPath.startsWith(this.localJobsPathWithSlash);
}
public boolean isJob(final String jobPath) {
return jobPath.startsWith(this.jobsBasePathWithSlash);
}
public String getJobsBasePathWithSlash() {
return this.jobsBasePathWithSlash;
}
public String getPreviousVersionAnonPath() {
return this.previousVersionAnonPath;
}
public String getPreviousVersionIdentifiedPath() {
return this.previousVersionIdentifiedPath;
}
public boolean disableDistribution() {
return this.disabledDistribution;
}
public String getStoredCancelledJobsPath() {
return this.storedCancelledJobsPath;
}
public String getStoredSuccessfulJobsPath() {
return this.storedSuccessfulJobsPath;
}
/**
* Get the storage path for finished jobs.
* @param topic Topic of the finished job
* @param jobId The job id of the finished job.
* @param isSuccess Whether processing was successful or not
* @return The complete storage path
*/
public String getStoragePath(final String topic, final String jobId, final boolean isSuccess) {
final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( isSuccess ) {
sb.append(this.storedSuccessfulJobsPath);
} else {
sb.append(this.storedCancelledJobsPath);
}
sb.append('/');
sb.append(topicName);
sb.append('/');
sb.append(jobId);
return sb.toString();
}
/**
* Check whether this is a storage path.
*/
public boolean isStoragePath(final String path) {
return path.startsWith(this.storedCancelledJobsPath) || path.startsWith(this.storedSuccessfulJobsPath);
}
/**
* Get the scheduled jobs path
* @param slash If {@code false} the path is returned, if {@code true} the path appended with a slash is returned.
* @return The path for the scheduled jobs
*/
public String getScheduledJobsPath(final boolean slash) {
return (slash ? this.scheduledJobsPathWithSlash : this.scheduledJobsPath);
}
/**
* Stop processing
*/
private void stopProcessing() {
logger.debug("Stopping job processing...");
final TopologyCapabilities caps = this.topologyCapabilities;
if ( caps != null ) {
// deactivate old capabilities - this stops all background processes
caps.deactivate();
this.topologyCapabilities = null;
// stop all listeners
this.notifyListeners();
}
logger.debug("Job processing stopped");
}
/**
* Start processing
* @param eventType The event type
* @param newCaps The new capabilities
*/
private void startProcessing(final Type eventType, final TopologyCapabilities newCaps) {
logger.debug("Starting job processing...");
// create new capabilities and update view
this.topologyCapabilities = newCaps;
// before we propagate the new topology we do some maintenance
if ( eventType == Type.TOPOLOGY_INIT ) {
final UpgradeTask task = new UpgradeTask(this);
task.run();
final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(this);
rt.run();
final CheckTopologyTask mt = new CheckTopologyTask(this);
mt.fullRun();
notifyListeners();
} else {
// and run checker again in some seconds (if leader)
// notify listeners afterwards
final Timer timer = new Timer();
timer.schedule(new TimerTask()
{
@Override
public void run() {
if ( newCaps == topologyCapabilities && newCaps.isActive()) {
// start listeners
notifyListeners();
if ( newCaps.isLeader() && newCaps.isActive() ) {
final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
mt.fullRun();
}
}
}
}, this.backgroundLoadDelay * 1000);
}
logger.debug("Job processing started");
}
/**
* Notify all listeners
*/
private void notifyListeners() {
synchronized ( this.listeners ) {
final TopologyCapabilities caps = this.topologyCapabilities;
for(final ConfigurationChangeListener l : this.listeners) {
l.configurationChanged(caps != null);
}
}
}
/**
* This method is invoked asynchronously from the TopologyHandler.
* Therefore this method can't be invoked concurrently
* @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
*/
public void handleTopologyEvent(TopologyEvent event) {
if ( this.startupDelayListener != null ) {
// with startup.delay > 0
this.startupDelayListener.handleTopologyEvent(event);
} else {
// classic (startup.delay <= 0)
this.logger.debug("Received topology event {}", event);
doHandleTopologyEvent(event);
}
}
void doHandleTopologyEvent(final TopologyEvent event) {
// check if there is a change of properties which doesn't affect us
// but we need to use the new view !
boolean stopProcessing = true;
if ( event.getType() == Type.PROPERTIES_CHANGED ) {
final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
logger.debug("No changes in capabilities - updating topology capabilities with new view");
stopProcessing = false;
}
}
final TopologyEvent.Type eventType = event.getType();
if ( eventType == Type.TOPOLOGY_CHANGING ) {
this.stopProcessing();
} else if ( eventType == Type.TOPOLOGY_INIT
|| event.getType() == Type.TOPOLOGY_CHANGED
|| event.getType() == Type.PROPERTIES_CHANGED ) {
if ( stopProcessing ) {
this.stopProcessing();
}
this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this));
}
}
/**
* Add a topology aware listener
* @param service Listener to notify about changes.
*/
public void addListener(final ConfigurationChangeListener service) {
synchronized ( this.listeners ) {
this.listeners.add(service);
service.configurationChanged(this.topologyCapabilities != null);
}
}
/**
* Remove a topology aware listener
* @param service Listener to notify about changes.
*/
public void removeListener(final ConfigurationChangeListener service) {
synchronized ( this.listeners ) {
this.listeners.remove(service);
}
}
private final Map<String, Job> retryList = new HashMap<>();
public void addJobToRetryList(final Job job) {
synchronized ( retryList ) {
retryList.put(job.getId(), job);
}
}
public List<Job> clearJobRetryList() {
final List<Job> result = new ArrayList<>();
synchronized ( this.retryList ) {
result.addAll(retryList.values());
retryList.clear();
}
return result;
}
public boolean removeJobFromRetryList(final Job job) {
synchronized ( retryList ) {
return retryList.remove(job.getId()) != null;
}
}
public Job getJobFromRetryList(final String jobId) {
synchronized ( retryList ) {
return retryList.get(jobId);
}
}
/**
* The audit logger is logging actions for auditing.
* @return The logger
*/
public Logger getAuditLogger() {
return this.auditLogger;
}
}