blob: 8bc982f235adc60781ddc5e2a522ebcad9020e0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.dea.impl;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.dea.DEAConstants;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This event handler distributes events from other application nodes in
* the cluster on the current instance.
* <p>
* It's listening for resource added events in the resource tree storing the
* distributed events. If a new resource is added, the resource is read,
* converted to an event and then send using the local event admin.
* <p>
*/
public class DistributedEventSender
implements EventHandler {
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** Is the background task still running? */
private volatile boolean running;
/** A local queue for serializing the event processing. */
private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
private final ResourceResolverFactory resourceResolverFactory;
private final EventAdmin eventAdmin;
private final String ownRootPathWithSlash;
private volatile ServiceRegistration serviceRegistration;
public DistributedEventSender(final BundleContext bundleContext,
final String rootPath,
final String ownRootPath,
final ResourceResolverFactory rrFactory,
final EventAdmin eventAdmin) {
this.eventAdmin = eventAdmin;
this.resourceResolverFactory = rrFactory;
this.ownRootPathWithSlash = ownRootPath + "/";
this.running = true;
final Thread backgroundThread = new Thread(new Runnable() {
@Override
public void run() {
// create service registration properties
final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
// listen for all resource added OSGi events in the DEA area
props.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
props.put(EventConstants.EVENT_FILTER, "(path=" + rootPath + "/*)");
final ServiceRegistration reg =
bundleContext.registerService(new String[] {EventHandler.class.getName()},
DistributedEventSender.this, props);
DistributedEventSender.this.serviceRegistration = reg;
try {
runInBackground();
} catch (Throwable t) { //NOSONAR
logger.error("Background thread stopped with exception: " + t.getMessage(), t);
running = false;
}
}
});
backgroundThread.start();
}
/**
* Deactivate this component.
*/
public void stop() {
if ( this.serviceRegistration != null ) {
this.serviceRegistration.unregister();
this.serviceRegistration = null;
}
// stop background threads by putting empty objects into the queue
this.running = false;
try {
this.queue.put("");
} catch (final InterruptedException e) {
this.ignoreException(e);
Thread.currentThread().interrupt();
}
}
/**
* Read an event from the resource
* @return The event object or <code>null</code>
*/
private Event readEvent(final Resource eventResource) {
try {
final ValueMap vm = ResourceHelper.getValueMap(eventResource);
final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class);
if ( topic == null ) {
// no topic should never happen as we check the resource type before
logger.error("Unable to read distributed event from " + eventResource.getPath() + " : no topic property available.");
} else {
final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
// only send event if there are no read errors, otherwise discard it
@SuppressWarnings("unchecked")
final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
if ( readErrorList == null ) {
properties.remove(EventConstants.EVENT_TOPIC);
properties.remove(DEAConstants.PROPERTY_DISTRIBUTE);
final Object oldRT = properties.remove("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE);
if ( oldRT != null ) {
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT);
} else {
properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
}
try {
final Event event = new Event(topic, properties);
return event;
} catch (final IllegalArgumentException iae) {
// this exception occurs if the topic is not correct (it should never happen,
// but you never know)
logger.error("Unable to read event: " + iae.getMessage(), iae);
}
} else {
for(final Exception e : readErrorList) {
logger.warn("Unable to read distributed event from " + eventResource.getPath(), e);
}
}
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
this.ignoreException(ie);
}
return null;
}
/**
* Background thread
*/
private void runInBackground() {
while ( this.running ) {
// so let's wait/get the next event from the queue
String path = null;
try {
path = this.queue.take();
} catch (final InterruptedException e) {
this.ignoreException(e);
Thread.currentThread().interrupt();
this.running = false;
}
if ( path != null && path.length() > 0 && this.running ) {
ResourceResolver resolver = null;
try {
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
final Resource eventResource = resolver.getResource(path);
if ( DistributedEventAdminImpl.RESOURCE_TYPE_EVENT.equals(eventResource.getResourceType())) {
final Event e = this.readEvent(eventResource);
if ( e != null ) {
// we check event admin as processing is async
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
localEA.postEvent(e);
} else {
this.logger.error("Unable to post event as no event admin is available.");
}
}
}
} catch (final LoginException ex) {
this.logger.error("Exception during creation of resource resolver.", ex);
} finally {
if ( resolver != null ) {
resolver.close();
}
}
}
}
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
@Override
public void handleEvent(final Event event) {
final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
if ( !path.startsWith(this.ownRootPathWithSlash) ) {
try {
this.queue.put(path);
} catch (final InterruptedException ex) {
this.ignoreException(ex);
Thread.currentThread().interrupt();
}
}
}
/**
* Helper method which just logs the exception in debug mode.
* @param e
*/
private void ignoreException(final Exception e) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Ignored exception " + e.getMessage(), e);
}
}
}