blob: 08660e8b49313bb1ea59c784bed33cb841c13dd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.PropertyUnbounded;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.References;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.discovery.PropertyProvider;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This component manages/keeps track of all job consumer services.
*/
@Component(label="Apache Sling Job Consumer Manager",
description="The consumer manager controls the job consumer (= processors). "
+ "It can be used to temporarily disable job processing on the current instance. Other instances "
+ "in a cluster are not affected.",
metatype=true)
@Service(value=JobConsumerManager.class)
@References({
@Reference(referenceInterface=JobConsumer.class,
cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC),
@Reference(referenceInterface=JobExecutor.class,
cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC)
})
@Property(name="org.apache.sling.installer.configuration.persist", boolValue=false,
label="Distribute config",
description="If this is disabled, the configuration is not persisted on save in the cluster and is "
+ "only used on the current instance. This option should always be disabled!")
public class JobConsumerManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Property(unbounded=PropertyUnbounded.ARRAY, value = "*",
label="Topic Whitelist",
description="This is a list of topics which currently should be "
+ "processed by this instance. Leaving it empty, all job consumers are disabled. Putting a '*' as "
+ "one entry, enables all job consumers. Adding separate topics enables job consumers for exactly "
+ "this topic.")
private static final String PROPERTY_WHITELIST = "job.consumermanager.whitelist";
@Property(unbounded=PropertyUnbounded.ARRAY,
label="Topic Blacklist",
description="This is a list of topics which currently shouldn't be "
+ "processed by this instance. Leaving it empty, all job consumers are enabled. Putting a '*' as "
+ "one entry, disables all job consumers. Adding separate topics disables job consumers for exactly "
+ "this topic.")
private static final String PROPERTY_BLACKLIST = "job.consumermanager.blacklist";
/** The map with the consumers, keyed by topic, sorted by service ranking. */
private final Map<String, List<ConsumerInfo>> topicToConsumerMap = new HashMap<String, List<ConsumerInfo>>();
/** ServiceRegistration for propagation. */
private ServiceRegistration propagationService;
private String topics;
private TopicMatcher[] whitelistMatchers;
private TopicMatcher[] blacklistMatchers;
private volatile long changeCount;
private BundleContext bundleContext;
private final Map<String, Object[]> listenerMap = new HashMap<String, Object[]>();
private Dictionary<String, Object> getRegistrationProperties() {
final Dictionary<String, Object> serviceProps = new Hashtable<String, Object>();
serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES, TopologyCapabilities.PROPERTY_TOPICS);
// we add a changing property to the service registration
// to make sure a modification event is really sent
synchronized ( this ) {
serviceProps.put("changeCount", this.changeCount++);
}
return serviceProps;
}
@Activate
protected void activate(final BundleContext bc, final Map<String, Object> props) {
this.bundleContext = bc;
this.modified(bc, props);
}
@Modified
protected void modified(final BundleContext bc, final Map<String, Object> props) {
final boolean wasEnabled = this.propagationService != null;
this.whitelistMatchers = TopicMatcherHelper.buildMatchers(PropertiesUtil.toStringArray(props.get(PROPERTY_WHITELIST)));
this.blacklistMatchers = TopicMatcherHelper.buildMatchers(PropertiesUtil.toStringArray(props.get(PROPERTY_BLACKLIST)));
final boolean enable = this.whitelistMatchers != null && this.blacklistMatchers != TopicMatcherHelper.MATCH_ALL;
if ( wasEnabled != enable ) {
synchronized ( this.topicToConsumerMap ) {
this.calculateTopics(enable);
}
if ( enable ) {
logger.debug("Registering property provider with: {}", this.topics);
this.propagationService = bc.registerService(PropertyProvider.class.getName(),
new PropertyProvider() {
@Override
public String getProperty(final String name) {
if ( TopologyCapabilities.PROPERTY_TOPICS.equals(name) ) {
return topics;
}
return null;
}
}, this.getRegistrationProperties());
} else {
logger.debug("Unregistering property provider with");
this.propagationService.unregister();
this.propagationService = null;
}
} else if ( enable ) {
// update properties
synchronized ( this.topicToConsumerMap ) {
this.calculateTopics(true);
}
logger.debug("Updating property provider with: {}", this.topics);
this.propagationService.setProperties(this.getRegistrationProperties());
}
}
@Deactivate
protected void deactivate() {
if ( this.propagationService != null ) {
this.propagationService.unregister();
this.propagationService = null;
}
this.bundleContext = null;
synchronized ( this.topicToConsumerMap ) {
this.topicToConsumerMap.clear();
this.listenerMap.clear();
}
}
/**
* Get the executor for the topic.
* @param topic The job topic
* @return A consumer or <code>null</code>
*/
public JobExecutor getExecutor(final String topic) {
synchronized ( this.topicToConsumerMap ) {
final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
if ( consumers != null ) {
return consumers.get(0).getExecutor(this.bundleContext);
}
int pos = topic.lastIndexOf('/');
if ( pos > 0 ) {
final String category = topic.substring(0, pos + 1).concat("*");
final List<ConsumerInfo> categoryConsumers = this.topicToConsumerMap.get(category);
if ( categoryConsumers != null ) {
return categoryConsumers.get(0).getExecutor(this.bundleContext);
}
// search deep consumers (since 1.2 of the consumer package)
do {
final String subCategory = topic.substring(0, pos + 1).concat("**");
final List<ConsumerInfo> subCategoryConsumers = this.topicToConsumerMap.get(subCategory);
if ( subCategoryConsumers != null ) {
return subCategoryConsumers.get(0).getExecutor(this.bundleContext);
}
pos = topic.lastIndexOf('/', pos - 1);
} while ( pos > 0 );
}
}
return null;
}
public void registerListener(final String key, final JobExecutor consumer, final JobExecutionContext handler) {
synchronized ( this.topicToConsumerMap ) {
this.listenerMap.put(key, new Object[] {consumer, handler});
}
}
public void unregisterListener(final String key) {
synchronized ( this.topicToConsumerMap ) {
this.listenerMap.remove(key);
}
}
/**
* Return the topics information of this instance.
*/
public String getTopics() {
return this.topics;
}
/**
* Bind a new consumer
* @param serviceReference The service reference to the consumer.
*/
protected void bindJobConsumer(final ServiceReference serviceReference) {
this.bindService(serviceReference, true);
}
/**
* Unbind a consumer
* @param serviceReference The service reference to the consumer.
*/
protected void unbindJobConsumer(final ServiceReference serviceReference) {
this.unbindService(serviceReference, true);
}
/**
* Bind a new executor
* @param serviceReference The service reference to the executor.
*/
protected void bindJobExecutor(final ServiceReference serviceReference) {
this.bindService(serviceReference, false);
}
/**
* Unbind a executor
* @param serviceReference The service reference to the executor.
*/
protected void unbindJobExecutor(final ServiceReference serviceReference) {
this.unbindService(serviceReference, false);
}
/**
* Bind a consumer or executor
* @param serviceReference The service reference to the consumer or executor.
* @param isConsumer Indicating whether this is a JobConsumer or JobExecutor
*/
private void bindService(final ServiceReference serviceReference, final boolean isConsumer) {
final String[] topics = PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
if ( topics != null && topics.length > 0 ) {
final ConsumerInfo info = new ConsumerInfo(serviceReference, isConsumer);
boolean changed = false;
synchronized ( this.topicToConsumerMap ) {
for(final String t : topics) {
if ( t != null ) {
final String topic = t.trim();
if ( topic.length() > 0 ) {
List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
if ( consumers == null ) {
consumers = new ArrayList<JobConsumerManager.ConsumerInfo>();
this.topicToConsumerMap.put(topic, consumers);
changed = true;
}
consumers.add(info);
Collections.sort(consumers);
}
}
}
if ( changed ) {
this.calculateTopics(this.propagationService != null);
}
}
if ( changed && this.propagationService != null ) {
logger.debug("Updating property provider with: {}", this.topics);
this.propagationService.setProperties(this.getRegistrationProperties());
}
}
}
/**
* Unbind a consumer or executor
* @param serviceReference The service reference to the consumer or executor.
* @param isConsumer Indicating whether this is a JobConsumer or JobExecutor
*/
private void unbindService(final ServiceReference serviceReference, final boolean isConsumer) {
final String[] topics = PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
if ( topics != null && topics.length > 0 ) {
final ConsumerInfo info = new ConsumerInfo(serviceReference, isConsumer);
boolean changed = false;
synchronized ( this.topicToConsumerMap ) {
for(final String t : topics) {
if ( t != null ) {
final String topic = t.trim();
if ( topic.length() > 0 ) {
final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
if ( consumers != null ) { // sanity check
for(final ConsumerInfo oldConsumer : consumers) {
if ( oldConsumer.equals(info) && oldConsumer.executor != null ) {
// notify listener
for(final Object[] listenerObjects : this.listenerMap.values()) {
if ( listenerObjects[0] == oldConsumer.executor ) {
final JobExecutionContext context = (JobExecutionContext)listenerObjects[1];
context.asyncProcessingFinished(context.result().failed());
break;
}
}
}
}
consumers.remove(info);
if ( consumers.size() == 0 ) {
this.topicToConsumerMap.remove(topic);
changed = true;
}
}
}
}
}
if ( changed ) {
this.calculateTopics(this.propagationService != null);
}
}
if ( changed && this.propagationService != null ) {
logger.debug("Updating property provider with: {}", this.topics);
this.propagationService.setProperties(this.getRegistrationProperties());
}
}
}
private boolean match(final String topic, final TopicMatcher[] matchers) {
for(final TopicMatcher m : matchers) {
if ( m.match(topic) != null ) {
return true;
}
}
return false;
}
private void calculateTopics(final boolean enabled) {
if ( enabled ) {
// create a sorted list - this ensures that the property value
// is always the same for the same topics.
final List<String> topicList = new ArrayList<String>();
for(final String topic : this.topicToConsumerMap.keySet() ) {
// check whitelist
if ( this.match(topic, this.whitelistMatchers) ) {
// and blacklist
if ( this.blacklistMatchers == null || !this.match(topic, this.blacklistMatchers) ) {
topicList.add(topic);
}
}
}
Collections.sort(topicList);
final StringBuilder sb = new StringBuilder();
boolean first = true;
for(final String topic : topicList ) {
if ( first ) {
first = false;
} else {
sb.append(',');
}
sb.append(topic);
}
this.topics = sb.toString();
} else {
this.topics = null;
}
}
/**
* Internal class caching some consumer infos like service id and ranking.
*/
private final static class ConsumerInfo implements Comparable<ConsumerInfo> {
public final ServiceReference serviceReference;
private final boolean isConsumer;
public JobExecutor executor;
public final int ranking;
public final long serviceId;
public ConsumerInfo(final ServiceReference serviceReference, final boolean isConsumer) {
this.serviceReference = serviceReference;
this.isConsumer = isConsumer;
final Object sr = serviceReference.getProperty(Constants.SERVICE_RANKING);
if ( sr == null || !(sr instanceof Integer)) {
this.ranking = 0;
} else {
this.ranking = (Integer)sr;
}
this.serviceId = (Long)serviceReference.getProperty(Constants.SERVICE_ID);
}
@Override
public int compareTo(final ConsumerInfo o) {
if ( this.ranking < o.ranking ) {
return 1;
} else if (this.ranking > o.ranking ) {
return -1;
}
// If ranks are equal, then sort by service id in descending order.
return (this.serviceId < o.serviceId) ? -1 : 1;
}
@Override
public boolean equals(final Object obj) {
if ( obj instanceof ConsumerInfo ) {
return ((ConsumerInfo)obj).serviceId == this.serviceId;
}
return false;
}
@Override
public int hashCode() {
return serviceReference.hashCode();
}
public JobExecutor getExecutor(final BundleContext bundleContext) {
if ( executor == null ) {
if ( this.isConsumer ) {
executor = new JobConsumerWrapper((JobConsumer) bundleContext.getService(this.serviceReference));
} else {
executor = (JobExecutor) bundleContext.getService(this.serviceReference);
}
}
return executor;
}
}
private final static class JobConsumerWrapper implements JobExecutor {
private final JobConsumer consumer;
public JobConsumerWrapper(final JobConsumer consumer) {
this.consumer = consumer;
}
@Override
public JobExecutionResult process(final Job job, final JobExecutionContext context) {
final JobConsumer.AsyncHandler asyncHandler =
new JobConsumer.AsyncHandler() {
final Object asyncLock = new Object();
final AtomicBoolean asyncDone = new AtomicBoolean(false);
private void check(final JobExecutionResult result) {
synchronized ( asyncLock ) {
if ( !asyncDone.get() ) {
asyncDone.set(true);
context.asyncProcessingFinished(result);
} else {
throw new IllegalStateException("Job is already marked as processed");
}
}
}
@Override
public void ok() {
this.check(context.result().succeeded());
}
@Override
public void failed() {
this.check(context.result().failed());
}
@Override
public void cancel() {
this.check(context.result().cancelled());
}
};
((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
final JobConsumer.JobResult result = this.consumer.process(job);
if ( result == JobResult.ASYNC ) {
return null;
} else if ( result == JobResult.FAILED) {
return context.result().failed();
} else if ( result == JobResult.OK) {
return context.result().succeeded();
}
return context.result().cancelled();
}
}
}