blob: dcf01014091c22f7a3afe50033be8f9d5af3ab5e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs.deprecated;
import java.util.Dictionary;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.event.EventPropertiesMap;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Event admin bridge for compatibility.
*
* This event handler receives jobs from the event admin and adds them
* to the job manager. It uses an async queue for adding.
*
* This handler is enabled by default, to disable it provide a configuration
* which removes both topic properties (or sets them to null)
*
* @deprecated
*/
@Deprecated
@Component(immediate=true)
@Service(value={EventHandler.class, JobConsumer.class})
@Properties({
@Property(name=EventConstants.EVENT_TOPIC, value=JobUtil.TOPIC_JOB),
@Property(name=JobConsumer.PROPERTY_TOPICS, value="/")
})
public class EventAdminBridge
implements EventHandler, JobConsumer {
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** A local queue for writing received events into the repository. */
private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
/** Is the background task still running? */
private volatile boolean running;
@Reference
private JobManager jobManager;
/**
* Activate this component and start background thread
*/
@Activate
protected void activate() {
// start writer background thread
this.running = true;
final Thread writerThread = new Thread(new Runnable() {
@Override
public void run() {
addJobs();
}
}, "Apache Sling Job Event Bridge");
writerThread.setDaemon(true);
writerThread.start();
}
/**
* Deactivate this component.
*/
@Deactivate
protected void deactivate() {
this.running = false;
// stop write queue
try {
this.writeQueue.put(new Event("deactivate", (Dictionary<String, Object>)null));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.debug("Apache Sling Job Event Bridge stopped on instance {}", Environment.APPLICATION_ID);
}
/**
* Background thread adding jobs received as events
*/
private void addJobs() {
logger.debug("Apache Sling Job Event Bridge started on instance {}", Environment.APPLICATION_ID);
this.processWriteQueue();
}
private static String[] IGNORED_CONFIG_PROPERTIES = new String[] {
JobUtil.PROPERTY_JOB_PARALLEL,
JobUtil.PROPERTY_JOB_RUN_LOCAL,
JobUtil.PROPERTY_JOB_RETRIES,
JobUtil.PROPERTY_JOB_QUEUE_NAME,
JobUtil.PROPERTY_JOB_QUEUE_ORDERED,
JobUtil.PROPERTY_JOB_PRIORITY
};
/**
* The writer queue. One job is written on each run.
*/
private void processWriteQueue() {
while ( this.running ) {
try {
// so let's wait/get the next job from the queue
Event event = null;
try {
event = this.writeQueue.take();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
this.running = false;
}
if ( event != null && this.running ) {
final JobManager jm = this.jobManager;
if ( jm == null ) {
try {
this.writeQueue.put(event);
Thread.sleep(500);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
this.running = false;
}
} else {
final String jobTopic = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_TOPIC);
final String jobName = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
final Map<String, Object> props = new EventPropertiesMap(event);
props.put(JobImpl.PROPERTY_BRIDGED_EVENT, Boolean.TRUE);
// check for deprecated/unsupported properties
for(final String ignoredProp : IGNORED_CONFIG_PROPERTIES) {
if ( props.containsKey(ignoredProp)) {
Utility.logDeprecated(logger, "Job " + EventUtil.toString(event) + " is using deprecated and ignored property " + ignoredProp);
props.remove(ignoredProp);
}
}
if ( jobName != null ) {
this.jobManager.addJob(jobTopic, jobName, props);
} else {
this.jobManager.addJob(jobTopic, props);
}
}
}
} catch (final Throwable t) { //NOSONAR
logger.error("Bridge thread got uncaught exception: " + t.getMessage(), t);
}
}
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
@Override
public void handleEvent(final Event event) {
if ( logger.isDebugEnabled() ) {
logger.debug("Receiving event {}", EventUtil.toString(event));
}
// we ignore remote job events
if ( EventUtil.isLocal(event) ) {
Utility.logDeprecated(logger, "Received job via deprecated OSGi event " + EventUtil.toString(event));
// check job topic
final String errorMessage = Utility.checkJobTopic(event.getProperty(ResourceHelper.PROPERTY_JOB_TOPIC));
if ( errorMessage == null ) {
try {
this.writeQueue.put(event);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
this.logger.warn(errorMessage + " : {}", EventUtil.toString(event));
}
}
}
@Override
public JobResult process(final Job job) {
// this is never been called, but we return something anyway
return JobResult.CANCEL;
}
}