blob: d01f77fcd751745d27a1c4395e42e494b78b38c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs.config;
import java.util.Arrays;
import java.util.Map;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.service.metatype.annotations.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(service=InternalQueueConfiguration.class,
name="org.apache.sling.event.jobs.QueueConfiguration",
configurationPolicy=ConfigurationPolicy.REQUIRE,
property={
Constants.SERVICE_VENDOR + "=The Apache Software Foundation"
})
@Designate(ocd = InternalQueueConfiguration.Config.class, factory = true)
public class InternalQueueConfiguration
implements QueueConfiguration, Comparable<InternalQueueConfiguration> {
@ObjectClassDefinition(name = "Apache Sling Job Queue Configuration",
description="The configuration of a job processing queue.")
public @interface Config {
@AttributeDefinition(name="Name",
description="The name of the queue. If matching is used the token {0} can be used to substitute the real value.")
String queue_name();
@AttributeDefinition(name="Topics",
description="This value is required and lists the topics processed by "
+ "this queue. The value is a list of strings. If a string ends with a dot, "
+ "all topics in exactly this package match. If the string ends with a star, "
+ "all topics in this package and all subpackages match. If the string neither "
+ "ends with a dot nor with a star, this is assumed to define an exact topic.")
String[] queue_topics();
@AttributeDefinition(name = "Type",
description="The queue type.",
options = {@Option(label="UNORDERED",value="Parallel"),
@Option(label="ORDERED",value="Ordered"),
@Option(label="TOPIC_ROUND_ROBIN",value="Topic Round Robin")})
String queue_type() default "UNORDERED";
@AttributeDefinition(
name="Priority",
description="The priority for the threads used by this queue. Default is norm.",
options = {
@Option(label="NORM",value="Norm"),
@Option(label="MIN",value="Min"),
@Option(label="MAX",value="Max")
})
String queue_priority() default ConfigurationConstants.DEFAULT_PRIORITY;
@AttributeDefinition(name="Maximum Retries",
description="The maximum number of times a failed job slated "
+ "for retries is actually retried. If a job has been retried this number of "
+ "times and still fails, it is not rescheduled and assumed to have failed. The "
+ "default value is 10.")
int queue_retries() default ConfigurationConstants.DEFAULT_RETRIES;
@AttributeDefinition(name="Retry Delay",
description="The number of milliseconds to sleep between two "
+ "consecutive retries of a job which failed and was set to be retried. The "
+ "default value is 2 seconds. This value is only relevant if there is a single "
+ "failed job in the queue. If there are multiple failed jobs, each job is "
+ "retried in turn without an intervening delay.")
long queue_retrydelay() default ConfigurationConstants.DEFAULT_RETRY_DELAY;
@AttributeDefinition(name="Maximum Parallel Jobs",
description="The maximum number of parallel jobs started for this queue. "
+ "A value of -1 is substituted with the number of available processors. "
+ "Positive integer values specify number of processors to use. Can be greater than number of processors. "
+ "A decimal number between 0.0 and 1.0 is treated as a fraction of available processors. "
+ "For example 0.5 means half of the available processors. For ordered queue types this value is ignored (always enforced to be 1).")
double queue_maxparallel() default ConfigurationConstants.DEFAULT_MAX_PARALLEL;
@AttributeDefinition(name="Keep History",
description="If this option is enabled, successful finished jobs are kept "
+ "to provide a complete history.")
boolean queue_keepJobs() default false;
@AttributeDefinition(name="Prefer Creation Instance",
description="If this option is enabled, the jobs are tried to "
+ "be run on the instance where the job was created.")
boolean queue_preferRunOnCreationInstance() default false;
@AttributeDefinition(name="Thread Pool Size",
description="Optional configuration value for a thread pool to be used by "
+ "this queue. If this is value has a positive number of threads configuration, this queue uses "
+ "an own thread pool with the configured number of threads.")
int queue_threadPoolSize() default 0;
@AttributeDefinition(name="Ranking",
description="Integer value defining the ranking of this queue configuration. "
+ "If more than one queue matches a job topic, the one with the highest ranking is used.")
int service_ranking() default 0;
// Internal Name hint for web console.
String webconsole_configurationFactory_nameHint() default "Queue: {" + ConfigurationConstants.PROP_NAME + "}";
}
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** The name of the queue. */
private String name;
/** The queue type. */
private Type type;
/** Number of retries. */
private int retries;
/** Retry delay. */
private long retryDelay;
/** Thread priority. */
private ThreadPriority priority;
/** The maximum number of parallel processes (for non ordered queues) */
private int maxParallelProcesses;
/** The ordering. */
private int serviceRanking;
/** The matchers for topics. */
private TopicMatcher[] matchers;
/** The configured topics. */
private String[] topics;
/** Keep jobs. */
private boolean keepJobs;
/** Valid flag. */
private boolean valid = false;
/** Optional thread pool size. */
private int ownThreadPoolSize;
/** Prefer creation instance. */
private boolean preferCreationInstance;
private String pid;
/**
* Create a new configuration from a config
*/
public static InternalQueueConfiguration fromConfiguration(final Map<String, Object> props, final Config config) {
final InternalQueueConfiguration c = new InternalQueueConfiguration();
c.activate(props, config);
return c;
}
public InternalQueueConfiguration() {
// nothing to do, see activate
}
/**
* Create a new queue configuration
*/
@Activate
protected void activate(final Map<String, Object> props, final Config config) {
this.name = config.queue_name();
try {
this.priority = ThreadPriority.valueOf(config.queue_priority());
} catch ( final IllegalArgumentException iae) {
logger.warn("Invalid value for queue priority. Using default instead of : {}", config.queue_priority());
this.priority = ThreadPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
}
try {
this.type = Type.valueOf(config.queue_type());
} catch ( final IllegalArgumentException iae) {
logger.error("Invalid value for queue type configuration: {}", config.queue_type());
this.type = null;
}
this.retries = config.queue_retries();
this.retryDelay = config.queue_retrydelay();
// Float values are treated as percentage. int values are treated as number of cores, -1 == all available
// Note: the value is based on the core count at startup. It will not change dynamically if core count changes.
int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
final double inMaxParallel = config.queue_maxparallel();
logger.debug("Max parallel for queue {} is {}", this.name, inMaxParallel);
if ((inMaxParallel == Math.floor(inMaxParallel)) && !Double.isInfinite(inMaxParallel)) {
// integral type
if ((int) inMaxParallel == 0) {
logger.warn("Max threads property for {} set to zero.", this.name);
}
this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) inMaxParallel);
} else {
// percentage (rounded)
if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) {
this.maxParallelProcesses = (int) Math.round(cores * inMaxParallel);
} else {
logger.warn("Invalid queue max parallel value for queue {}. Using {}", this.name, cores);
this.maxParallelProcesses = cores;
}
}
logger.debug("Thread pool size for {} was set to {}", this.name, this.maxParallelProcesses);
// ignore parallel setting for ordered queues
if ( this.type == Type.ORDERED ) {
this.maxParallelProcesses = 1;
}
final String[] topicsParam = config.queue_topics();
this.matchers = TopicMatcherHelper.buildMatchers(topicsParam);
if ( this.matchers == null ) {
this.topics = null;
} else {
this.topics = topicsParam;
}
this.keepJobs = config.queue_keepJobs();
this.serviceRanking = config.service_ranking();
this.ownThreadPoolSize = config.queue_threadPoolSize();
this.preferCreationInstance = config.queue_preferRunOnCreationInstance();
this.pid = (String)props.get(Constants.SERVICE_PID);
this.valid = this.checkIsValid();
}
/**
* Check if this configuration is valid,
* If it is invalid, it is ignored.
*/
private boolean checkIsValid() {
if ( type == null ) {
return false;
}
boolean hasMatchers = false;
if ( this.matchers != null ) {
for(final TopicMatcher m : this.matchers ) {
if ( m != null ) {
hasMatchers = true;
break;
}
}
}
if ( !hasMatchers ) {
return false;
}
if ( name == null || name.length() == 0 ) {
return false;
}
if ( retries < -1 ) {
return false;
}
if ( maxParallelProcesses < 1 ) {
return false;
}
return true;
}
public boolean isValid() {
return this.valid;
}
/**
* Check if the queue processes the event.
* @param topic The topic of the event
* @return The queue name or <code>null</code>
*/
public String match(final String topic) {
if ( this.matchers != null ) {
for(final TopicMatcher m : this.matchers ) {
if ( m != null ) {
final String rep = m.match(topic);
if ( rep != null ) {
return this.name.replace("{0}", rep);
}
}
}
}
return null;
}
/**
* Return the name of the queue.
*/
public String getName() {
return this.name;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getRetryDelayInMs()
*/
@Override
public long getRetryDelayInMs() {
return this.retryDelay;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getMaxRetries()
*/
@Override
public int getMaxRetries() {
return this.retries;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getType()
*/
@Override
public Type getType() {
return this.type;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
*/
@Override
public int getMaxParallel() {
return this.maxParallelProcesses;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
*/
@Override
public String[] getTopics() {
return this.topics;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getRanking()
*/
@Override
public int getRanking() {
return this.serviceRanking;
}
public String getPid() {
return this.pid;
}
@Override
public boolean isKeepJobs() {
return this.keepJobs;
}
@Override
public int getOwnThreadPoolSize() {
return this.ownThreadPoolSize;
}
@Override
public boolean isPreferRunOnCreationInstance() {
return this.preferCreationInstance;
}
@Override
public String toString() {
return "Queue-Configuration(" + this.hashCode() + ") : {" +
"name=" + this.name +
", type=" + this.type +
", topics=" + (this.matchers == null ? "[]" : Arrays.toString(this.matchers)) +
", maxParallelProcesses=" + this.maxParallelProcesses +
", retries=" + this.retries +
", retryDelayInMs=" + this.retryDelay +
", keepJobs=" + this.keepJobs +
", preferRunOnCreationInstance=" + this.preferCreationInstance +
", ownThreadPoolSize=" + this.ownThreadPoolSize +
", serviceRanking=" + this.serviceRanking +
", pid=" + this.pid +
", isValid=" + this.isValid() + "}";
}
@Override
public int compareTo(final InternalQueueConfiguration other) {
if ( this.serviceRanking < other.serviceRanking ) {
return 1;
} else if ( this.serviceRanking > other.serviceRanking ) {
return -1;
}
return 0;
}
@Override
public ThreadPriority getThreadPriority() {
return this.priority;
}
}