blob: 503d18ea23ba834dadf818c79f9680958f75335b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.PropertyIterator;
import javax.jcr.PropertyType;
import javax.jcr.RepositoryException;
import javax.jcr.Value;
import javax.jcr.ValueFactory;
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.event.EventUtil.JobStatusNotifier.NotifierContext;
import org.apache.sling.event.impl.AbstractRepositoryEventHandler;
import org.apache.sling.event.impl.JobEventHandler;
import org.osgi.service.event.Event;
import org.slf4j.LoggerFactory;
/**
* The <code>EventUtil</code> class is an utility class for
* clustered environments.
*/
public abstract class EventUtil {
/** This event property indicates, if the event should be distributed in the cluster (default false). */
public static final String PROPERTY_DISTRIBUTE = "event.distribute";
/** This event property specifies the application node. */
public static final String PROPERTY_APPLICATION = "event.application";
/**
* Job Handling
*/
/** The job topic property. */
public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
/** The property for the unique event id. Value is of type String. */
public static final String PROPERTY_JOB_ID = "event.job.id";
/** The property to set if a job can be run parallel to any other job. */
public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
/** The property to set if a job should only be run on the same app it has been created. */
public static final String PROPERTY_JOB_RUN_LOCAL = "event.job.run.local";
/** The property to track the retry count for jobs. Value is of type Integer. */
public static final String PROPERTY_JOB_RETRY_COUNT = "event.job.retrycount";
/** The property to for setting the maximum number of retries. Value is of type Integer. */
public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
/** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
/** The property to set to put the jobs into a separate job queue. This property
* spcifies the name of the job queue. If the job queue does not exists yet
* a new queue is created.
* If a job queue is used, the jobs are never executed in parallel from this queue!
*/
public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
/** If this property is set with any value, the queue processes the jobs in the same
* order as they have arrived.
* This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified.
*/
public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
/** The topic for jobs. */
public static final String TOPIC_JOB = "org/apache/sling/event/job";
/**
* Timed Events
*/
/** The topic for timed events. */
public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
/** The real topic of the event. */
public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
/** The property for the unique event id. */
public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
/** The scheduler expression for the timed event. */
public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
/** The period for the timed event. */
public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
/** The date for the timed event. */
public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
/**
* Utility Methods
*/
/**
* Create a distributable event.
* A distributable event is distributed across the cluster.
* @param topic
* @param properties
* @return An OSGi event.
*/
public static Event createDistributableEvent(String topic,
Dictionary<String, Object> properties) {
final Dictionary<String, Object> newProperties;
// create a new dictionary
newProperties = new Hashtable<String, Object>();
if ( properties != null ) {
final Enumeration<String> e = properties.keys();
while ( e.hasMoreElements() ) {
final String key = e.nextElement();
newProperties.put(key, properties.get(key));
}
}
// for now the value has no meaning, so we just put an empty string in it.
newProperties.put(PROPERTY_DISTRIBUTE, "");
return new Event(topic, newProperties);
}
/**
* Should this event be distributed in the cluster?
* @param event
* @return <code>true</code> if the event should be distributed.
*/
public static boolean shouldDistribute(Event event) {
return event.getProperty(PROPERTY_DISTRIBUTE) != null;
}
/**
* Is this a local event?
* @param event
* @return <code>true</code> if this is a local event
*/
public static boolean isLocal(Event event) {
final String appId = getApplicationId(event);
return appId == null || appId.equals(AbstractRepositoryEventHandler.APPLICATION_ID);
}
/**
* Return the application id the event was created at.
* @param event
* @return The application id or null if the event has been created locally.
*/
public static String getApplicationId(Event event) {
return (String)event.getProperty(PROPERTY_APPLICATION);
}
/**
* Is this a job event?
* This method checks if the event contains the {@link #PROPERTY_JOB_TOPIC}
* property.
* @param event The event to check.
* @return <code>true></code> if this is a job event.
*/
public static boolean isJobEvent(Event event) {
return event.getProperty(PROPERTY_JOB_TOPIC) != null;
}
/**
* Check if this a job event and return the notifier context.
* @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
*/
private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
// check if this is a job event
if ( !isJobEvent(job) ) {
return null;
}
final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
if ( ctx == null ) {
throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
}
return ctx;
}
/**
* Notify a finished job.
* @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
*/
public static void finishedJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
if ( ctx != null ) {
ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
}
}
/**
* Notify a failed job.
* @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
* @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
*/
public static boolean rescheduleJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
if ( ctx != null ) {
return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
}
return false;
}
/**
* Process a job in the background and notify its success.
* This method also sends an acknowledge message to the job event handler.
*/
public static void processJob(final Event job, final JobProcessor processor) {
// first check for a notifier context to send an acknowledge
boolean notify = true;
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
if ( ctx != null ) {
if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
// if we don't get an ack, someone else is already processing this job.
// we process but do not notify the job event handler.
LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job);
notify = false;
}
}
final boolean notifyResult = notify;
final Runnable task = new Runnable() {
/**
* @see java.lang.Runnable#run()
*/
public void run() {
boolean result = false;
try {
result = processor.process(job);
} catch (Throwable t) {
LoggerFactory.getLogger(EventUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
// we don't reschedule if an exception occurs
result = true;
} finally {
if ( notifyResult ) {
if ( result ) {
EventUtil.finishedJob(job);
} else {
EventUtil.rescheduleJob(job);
}
}
}
}
};
// check if the job handler thread pool is available
if ( JobEventHandler.JOB_THREAD_POOL != null ) {
JobEventHandler.JOB_THREAD_POOL.execute(task);
} else {
// if we don't have a thread pool, we create the thread directly
// (this should never happen for jobs, but is a safe fallback and
// allows to call this method for other background processing.
new Thread(task).start();
}
}
/**
* This is a private interface which is only public for import reasons.
*/
public static interface JobStatusNotifier {
String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
public static final class NotifierContext {
public final JobStatusNotifier notifier;
public final String eventNodePath;
public NotifierContext(JobStatusNotifier n, String path) {
this.notifier = n;
this.eventNodePath = path;
}
}
/**
* Send an acknowledge message that someone is processing the job.
* @param job The job.
* @param eventNodePath The storage node in the repository.
* @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
* someone else already send an ack for this job.
*/
boolean sendAcknowledge(Event job, String eventNodePath);
/**
* Notify that the job is finished.
* If the job is not rescheduled, a return value of <code>false</code> indicates an error
* during the processing. If the job should be rescheduled, <code>true</code> indicates
* that the job could be rescheduled. If an error occurs or the number of retries is
* exceeded, <code>false</code> will be returned.
* @param job The job.
* @param eventNodePath The storage node in the repository.
* @param reschedule Should the event be rescheduled?
* @return <code>true</code> if everything went fine, <code>false</code> otherwise.
*/
boolean finishedJob(Event job, String eventNodePath, boolean reschedule);
}
/**
* Add all java properties as properties to the node.
* If the name and the value of a map entry can easily converted into
* a repository property, it is directly added. All other java
* properties are stored in one binary property.
*
* @param node The node where all properties are added to
* @param properties The map of properties.
* @param ignoreProps optional list of property which should be ignored
* @param binPropertyName The name of the binary property.
* @throws RepositoryException
*/
public static void addProperties(final Node node,
final Map<String, Object> properties,
final String[] ignoreProps,
final String binPropertyName)
throws RepositoryException {
addProperties(node, new EventPropertiesMap(properties), ignoreProps, binPropertyName);
}
/**
* Add all java properties as properties to the node.
* If the name and the value of a map entry can easily converted into
* a repository property, it is directly added. All other java
* properties are stored in one binary property.
*
* @param node The node where all properties are added to
* @param properties The map of properties.
* @param ignoreProps optional list of property which should be ignored
* @param binPropertyName The name of the binary property.
* @throws RepositoryException
*/
public static void addProperties(final Node node,
final EventPropertiesMap properties,
final String[] ignoreProps,
final String binPropertyName)
throws RepositoryException {
if ( properties != null ) {
final List<String> ignorePropList = (ignoreProps == null ? null : Arrays.asList(ignoreProps));
// check which props we can write directly and
// which we need to write as a binary blob
final List<String> propsAsBlob = new ArrayList<String>();
final Iterator<Map.Entry<String, Object>> i = properties.entrySet().iterator();
while ( i.hasNext() ) {
final Map.Entry<String, Object> current = i.next();
if (ignorePropList == null || !ignorePropList.contains(current.getKey()) ) {
// sanity check
if ( current.getValue() != null ) {
if ( !setProperty(current.getKey(), current.getValue(), node) ) {
propsAsBlob.add(current.getKey());
}
}
}
}
// write the remaining properties as a blob
if ( propsAsBlob.size() > 0 ) {
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeInt(propsAsBlob.size());
for(final String propName : propsAsBlob) {
oos.writeObject(propName);
try {
oos.writeObject(properties.get(propName));
} catch (IOException ioe) {
throw new RepositoryException("Unable to serialize property " + propName, ioe);
}
}
oos.close();
node.setProperty(binPropertyName, new ByteArrayInputStream(baos.toByteArray()));
} catch (IOException ioe) {
throw new RepositoryException("Unable to serialize properties " + properties, ioe);
}
}
}
}
/**
* Read properties from a repository node and create a property map.
* @throws RepositoryException
* @throws ClassNotFoundException
*/
public static EventPropertiesMap readProperties(final Node node,
final String binPropertyName,
final String[] ignorePrefixes)
throws RepositoryException, ClassNotFoundException {
final Map<String, Object> properties = new HashMap<String, Object>();
// check the properties blob
if ( node.hasProperty(binPropertyName) ) {
try {
final ObjectInputStream ois = new ObjectInputStream(node.getProperty(binPropertyName).getStream());
int length = ois.readInt();
for(int i=0;i<length;i++) {
final String key = (String)ois.readObject();
final Object value = ois.readObject();
properties.put(key, value);
}
} catch (java.io.InvalidClassException ice) {
throw new ClassNotFoundException("Found invalid class.", ice);
} catch (IOException ioe) {
throw new RepositoryException("Unable to deserialize event properties.", ioe);
}
}
// now all properties that have been set directly
final PropertyIterator pI = node.getProperties();
while ( pI.hasNext() ) {
final Property p = pI.nextProperty();
boolean ignore = p.getName().startsWith("jcr:");
if ( !ignore && ignorePrefixes != null ) {
int index = 0;
while ( !ignore && index < ignorePrefixes.length ) {
ignore = p.getName().startsWith(ignorePrefixes[index]);
index++;
}
}
if ( !ignore ) {
final String name = ISO9075.decode(p.getName());
if ( p.getDefinition().isMultiple() ) {
final Value[] values = p.getValues();
if ( values.length > 0 ) {
// get first value
final Object firstObject = getPropertyValue(values[0]);
final Object[] array;
if ( firstObject instanceof Boolean ) {
array = new Boolean[values.length];
} else if ( firstObject instanceof Calendar ) {
array = new Calendar[values.length];
} else if ( firstObject instanceof Double ) {
array = new Double[values.length];
} else if ( firstObject instanceof Long ) {
array = new Long[values.length];
} else {
array = new String[values.length];
}
array[0] = firstObject;
int index = 1;
while ( index < values.length ) {
array[index] = getPropertyValue(values[index]);
index++;
}
properties.put(name, array);
}
} else {
final Value value = p.getValue();
final Object o = getPropertyValue(value);
properties.put(name, o);
}
}
}
return new EventPropertiesMap(properties);
}
/**
* Return the converted repository property name
* @param name The java object property name
* @return The converted name or null if not possible.
*/
public static String getNodePropertyName(final String name) {
// if name contains a colon, we can't set it as a property
if ( name.indexOf(':') != -1 ) {
return null;
}
return ISO9075.encode(name);
}
/**
* Return the converted repository property value
* @param valueFactory The value factory
* @param eventValue The event value
* @return The converted value or null if not possible
*/
public static Value getNodePropertyValue(final ValueFactory valueFactory, final Object eventValue) {
final Value val;
if (eventValue instanceof Calendar) {
val = valueFactory.createValue((Calendar)eventValue);
} else if (eventValue instanceof Long) {
val = valueFactory.createValue((Long)eventValue);
} else if (eventValue instanceof Double) {
val = valueFactory.createValue(((Double)eventValue).doubleValue());
} else if (eventValue instanceof Boolean) {
val = valueFactory.createValue((Boolean) eventValue);
} else if (eventValue instanceof String) {
val = valueFactory.createValue((String)eventValue);
} else {
val = null;
}
return val;
}
/**
* Convert the value back to an object.
* @param value
* @return
* @throws RepositoryException
*/
private static Object getPropertyValue(final Value value)
throws RepositoryException {
final Object o;
switch (value.getType()) {
case PropertyType.BOOLEAN:
o = value.getBoolean(); break;
case PropertyType.DATE:
o = value.getDate(); break;
case PropertyType.DOUBLE:
o = value.getDouble(); break;
case PropertyType.LONG:
o = value.getLong(); break;
case PropertyType.STRING:
o = value.getString(); break;
default: // this should never happen - we convert to a string...
o = value.getString();
}
return o;
}
/**
* Try to set the java property as a property of the node.
* @param name
* @param value
* @param node
* @return
* @throws RepositoryException
*/
private static boolean setProperty(String name, Object value, Node node)
throws RepositoryException {
final String propName = getNodePropertyName(name);
if ( propName == null ) {
return false;
}
final ValueFactory fac = node.getSession().getValueFactory();
// check for multi value
if ( value.getClass().isArray() ) {
final Object[] array = (Object[])value;
// now we try to convert each value
// and check if all converted values have the same type
final Value[] values = new Value[array.length];
int index = 0;
for(final Object v : array ) {
values[index] = getNodePropertyValue(fac, v);
if ( values[index] == null ) {
return false;
}
if ( index > 0 && !values[index-1].getClass().equals(values[index].getClass()) ) {
return false;
}
index++;
}
node.setProperty(propName, values);
return true;
}
final Value val = getNodePropertyValue(fac, value);
if ( val != null ) {
node.setProperty(propName, val);
return true;
}
return false;
}
/**
* Improved toString method for an Event.
* This method prints out the event topic and all of the properties.
*/
public static String toString(final Event e) {
if ( e == null ) {
return "<null>";
}
final StringBuffer buffer =new StringBuffer(e.getClass().getName());
buffer.append(" [topic=");
buffer.append(e.getTopic());
buffer.append(", properties=");
final String[] names = e.getPropertyNames();
if ( names != null ) {
for(int i=0;i<names.length;i++) {
if ( i>0) {
buffer.append(",");
}
buffer.append(names[i]);
buffer.append('=');
buffer.append(e.getProperty(names[i]));
}
}
buffer.append("]");
return buffer.toString();
}
}