blob: d59cf25d62ddc4843cf968f46c535fb530d8189c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl;
import java.util.Calendar;
import java.util.Dictionary;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.EventIterator;
import javax.jcr.query.Query;
import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.commons.osgi.OsgiUtil;
import org.apache.sling.event.EventUtil;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
/**
* This event handler distributes events across an application cluster.
* @scr.component inherit="true" label="%dist.events.name" description="%dist.events.description"
* @scr.property name="event.topics" value="*" private="true"
* @scr.property name="event.filter" value="(event.distribute=*)" private="true"
* @scr.property name="repository.path" value="/var/eventing/distribution" private="true"
*
* We schedule this event handler to run in the background and clean up
* obsolete events.
* @scr.service interface="java.lang.Runnable"
* @scr.property name="scheduler.period" value="1800" type="Long"
* @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
*/
public class DistributingEventHandler
extends AbstractRepositoryEventHandler
implements Runnable {
/** Default clean up time is 15 minutes. */
protected static final int DEFAULT_CLEANUP_PERIOD = 15;
/** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
/** We remove everything which is older than 15min by default. */
protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext)
*/
protected void activate(ComponentContext context)
throws Exception {
@SuppressWarnings("unchecked")
final Dictionary<String, Object> props = context.getProperties();
this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
super.activate(context);
}
/**
* Return the query string for the clean up.
*/
protected String getCleanUpQueryString() {
final Calendar deleteBefore = Calendar.getInstance();
deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
final String dateString = ISO8601.format(deleteBefore);
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
buffer.append(getEventNodeType());
buffer.append(")[@");
buffer.append(EventHelper.NODE_PROPERTY_CREATED);
buffer.append(" < xs:dateTime('");
buffer.append(dateString);
buffer.append("')]");
return buffer.toString();
}
/**
* This method is invoked periodically.
* @see java.lang.Runnable#run()
*/
public void run() {
if ( this.cleanupPeriod > 0 ) {
this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
final String queryString = this.getCleanUpQueryString();
// we create an own session for concurrency issues
Session s = null;
try {
s = this.createSession();
final Node parentNode = (Node)s.getItem(this.repositoryPath);
logger.debug("Executing query {}", queryString);
final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
final NodeIterator iter = q.execute().getNodes();
int count = 0;
while ( iter.hasNext() ) {
final Node eventNode = iter.nextNode();
eventNode.remove();
count++;
}
parentNode.save();
logger.debug("Removed {} entries from the repository.", count);
} catch (RepositoryException e) {
// in the case of an error, we just log this as a warning
this.logger.warn("Exception during repository cleanup.", e);
} finally {
if ( s != null ) {
s.logout();
}
}
}
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
protected void processWriteQueue() {
while ( this.running ) {
// so let's wait/get the next job from the queue
Event event = null;
try {
event = this.writeQueue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
if ( event != null && this.running ) {
try {
final Node eventNode = this.writeEvent(event, null);
final EventInfo info = new EventInfo();
info.event = event;
info.nodePath = eventNode.getPath();
try {
this.queue.put(info);
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
} catch (Exception e) {
this.logger.error("Exception during writing the event to the repository.", e);
}
}
}
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
*/
protected void runInBackground() {
while ( this.running ) {
// so let's wait/get the next job from the queue
EventInfo info = null;
try {
info = this.queue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
if ( info != null && this.running ) {
if ( info.nodePath != null) {
Session session = null;
try {
session = this.createSession();
final Node eventNode = (Node)session.getItem(info.nodePath);
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
localEA.postEvent(this.readEvent(eventNode));
} else {
this.logger.error("Unable to post event as no event admin is available.");
}
} catch (Exception ex) {
this.logger.error("Exception during reading the event from the repository.", ex);
} finally {
if ( session != null ) {
session.logout();
}
}
}
}
}
}
/**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
public void handleEvent(final Event event) {
try {
this.writeQueue.put(event);
} catch (InterruptedException ex) {
// we ignore this
this.ignoreException(ex);
}
}
/**
* @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
*/
public void onEvent(final EventIterator iterator) {
while ( iterator.hasNext() ) {
final javax.jcr.observation.Event event = iterator.nextEvent();
try {
final EventInfo info = new EventInfo();
info.nodePath = event.getPath();
this.queue.put(info);
} catch (InterruptedException ex) {
// we ignore this
this.ignoreException(ex);
} catch (RepositoryException ex) {
this.logger.error("Exception during reading the event from the repository.", ex);
}
}
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
*/
protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
throws RepositoryException {
super.addEventProperties(eventNode, properties);
properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
}
/**
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
*/
protected void startWriterSession() throws RepositoryException {
super.startWriterSession();
this.writerSession.getWorkspace().getObservationManager()
.addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
}
}