Move Sling to new TLP location
git-svn-id: https://svn.eu.apache.org/repos/asf/sling/tags/org.apache.sling.event-2.0.4-incubator@785979 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..32a792a
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,8 @@
+Apache Sling Event
+Copyright 2008-2009 The Apache Software Foundation
+
+Apache Sling is based on source code originally developed
+by Day Software (http://www.day.com/).
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..3c5e9f8
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,39 @@
+Apache Sling Event
+
+This bundle provides additional features for event handling, like replication
+events in a cluster, providing the support for timed events and jobs.
+
+
+Disclaimer
+==========
+Apache Sling is an effort undergoing incubation at The Apache Software Foundation (ASF),
+sponsored by the Apache Jackrabbit PMC. Incubation is required of all newly accepted
+projects until a further review indicates that the infrastructure, communications,
+and decision making process have stabilized in a manner consistent with other
+successful ASF projects. While incubation status is not necessarily a reflection of
+the completeness or stability of the code, it does indicate that the project has yet
+to be fully endorsed by the ASF.
+
+Getting Started
+===============
+
+This component uses a Maven 2 (http://maven.apache.org/) build
+environment. It requires a Java 5 JDK (or higher) and Maven (http://maven.apache.org/)
+2.0.7 or later. We recommend to use the latest Maven version.
+
+If you have Maven 2 installed, you can compile and
+package the jar using the following command:
+
+ mvn package
+
+See the Maven 2 documentation for other build features.
+
+The latest source code for this component is available in the
+Subversion (http://subversion.tigris.org/) source repository of
+the Apache Software Foundation. If you have Subversion installed,
+you can checkout the latest source using the following command:
+
+ svn checkout http://svn.apache.org/repos/asf/incubator/sling/trunk/extensions/event
+
+See the Subversion documentation for other source control features.
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..16c5ea6
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>sling</artifactId>
+ <version>5-incubator</version>
+ <relativePath>../../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>org.apache.sling.event</artifactId>
+ <packaging>bundle</packaging>
+ <version>2.0.4-incubator</version>
+
+ <name>Apache Sling Event Support</name>
+ <description>
+ Support for eventing.
+ </description>
+
+ <scm>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</developerConnection>
+ <url>http://svn.apache.org/viewvc/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</url>
+ </scm>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ org.apache.sling.event;version=${pom.version}
+ </Export-Package>
+ <Private-Package>
+ org.apache.sling.event.impl,
+ org.apache.jackrabbit.util
+ </Private-Package>
+ <!-- We need the dynamic import as events can contain any classes -->
+ <DynamicImport-Package>*</DynamicImport-Package>
+ <Sling-Nodetypes>
+ SLING-INF/nodetypes/event.cnd
+ </Sling-Nodetypes>
+ <Sling-Namespaces>
+ slingevent=http://sling.apache.org/jcr/event/1.0
+ </Sling-Namespaces>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>
+ org.apache.sling.event.impl
+ </excludePackageNames>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.0.2-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.engine</artifactId>
+ <version>2.0.2-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.api</artifactId>
+ <version>2.0.2-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.resource</artifactId>
+ <version>2.0.4-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.scheduler</artifactId>
+ <version>2.0.2-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.threads</artifactId>
+ <version>2.0.2-incubator</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-jcr-commons</artifactId>
+ <version>1.4.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <!-- Testing -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.testing</artifactId>
+ <version>2.0.4-incubator</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/event/EventPropertiesMap.java b/src/main/java/org/apache/sling/event/EventPropertiesMap.java
new file mode 100644
index 0000000..c69dc4b
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/EventPropertiesMap.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.osgi.service.event.Event;
+
+/**
+ * An implementation of a map that helps in dealing with properties
+ * of an OSGi event.
+ * This map implements both, the map and the dictionary interfaces.
+ */
+public class EventPropertiesMap
+ extends Dictionary<String, Object>
+ implements Map<String, Object>, Serializable {
+
+ private final Map<String, Object> delegatee;
+
+ /**
+ * Construct a new map out of an event object.
+ * The resulting map is modifiable. But any modification has
+ * no influence on the original properties of the event!
+ * @param event The event object.
+ */
+ public EventPropertiesMap(final Event event) {
+ // create a map out of the event properties
+ final Map<String, Object> props = new HashMap<String, Object>();
+ if ( event.getPropertyNames() != null ) {
+ for(final String key : event.getPropertyNames() ) {
+ props.put(key, event.getProperty(key));
+ }
+ }
+ this.delegatee = props;
+ }
+
+ /**
+ * Construct a new map out of another map.
+ * @param props The properties map object.
+ */
+ public EventPropertiesMap(final Map<String, Object> props) {
+ this.delegatee = props;
+ }
+
+ /**
+ * Construct a new map.
+ */
+ public EventPropertiesMap() {
+ this.delegatee = new HashMap<String, Object>();
+ }
+
+ /**
+ * @see java.util.Map#clear()
+ */
+ public void clear() {
+ delegatee.clear();
+ }
+
+ /**
+ * @see java.util.Map#containsKey(java.lang.Object)
+ */
+ public boolean containsKey(Object key) {
+ return delegatee.containsKey(key);
+ }
+
+ /**
+ * @see java.util.Map#containsValue(java.lang.Object)
+ */
+ public boolean containsValue(Object value) {
+ return delegatee.containsValue(value);
+ }
+
+ /**
+ * @see java.util.Map#entrySet()
+ */
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
+ return delegatee.entrySet();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+ return delegatee.equals(o);
+ }
+
+ /**
+ * @see java.util.Dictionary#get(java.lang.Object)
+ */
+ public Object get(Object key) {
+ return delegatee.get(key);
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return delegatee.hashCode();
+ }
+
+ /**
+ * @see java.util.Dictionary#isEmpty()
+ */
+ public boolean isEmpty() {
+ return delegatee.isEmpty();
+ }
+
+ /**
+ * @see java.util.Map#keySet()
+ */
+ public Set<String> keySet() {
+ return delegatee.keySet();
+ }
+
+ /**
+ * @see java.util.Dictionary#put(java.lang.Object, java.lang.Object)
+ */
+ public Object put(String key, Object value) {
+ return delegatee.put(key, value);
+ }
+
+ /**
+ * @see java.util.Map#putAll(java.util.Map)
+ */
+ public void putAll(Map<? extends String, ? extends Object> t) {
+ delegatee.putAll(t);
+ }
+
+ /**
+ * @see java.util.Dictionary#remove(java.lang.Object)
+ */
+ public Object remove(Object key) {
+ return delegatee.remove(key);
+ }
+
+ /**
+ * @see java.util.Dictionary#size()
+ */
+ public int size() {
+ return delegatee.size();
+ }
+
+ /**
+ * @see java.util.Map#values()
+ */
+ public Collection<Object> values() {
+ return delegatee.values();
+ }
+
+ /**
+ * @see java.util.Dictionary#elements()
+ */
+ public Enumeration<Object> elements() {
+ return Collections.enumeration(this.values());
+ }
+
+ /**
+ * @see java.util.Dictionary#keys()
+ */
+ public Enumeration<String> keys() {
+ return Collections.enumeration(this.keySet());
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return this.delegatee.toString();
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/EventUtil.java b/src/main/java/org/apache/sling/event/EventUtil.java
new file mode 100644
index 0000000..503d18e
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/EventUtil.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.PropertyIterator;
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil.JobStatusNotifier.NotifierContext;
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler;
+import org.apache.sling.event.impl.JobEventHandler;
+import org.osgi.service.event.Event;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>EventUtil</code> class is an utility class for
+ * clustered environments.
+ */
+public abstract class EventUtil {
+
+ /** This event property indicates, if the event should be distributed in the cluster (default false). */
+ public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+
+ /** This event property specifies the application node. */
+ public static final String PROPERTY_APPLICATION = "event.application";
+
+ /**
+ * Job Handling
+ */
+
+ /** The job topic property. */
+ public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+
+ /** The property for the unique event id. Value is of type String. */
+ public static final String PROPERTY_JOB_ID = "event.job.id";
+
+ /** The property to set if a job can be run parallel to any other job. */
+ public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
+
+ /** The property to set if a job should only be run on the same app it has been created. */
+ public static final String PROPERTY_JOB_RUN_LOCAL = "event.job.run.local";
+
+ /** The property to track the retry count for jobs. Value is of type Integer. */
+ public static final String PROPERTY_JOB_RETRY_COUNT = "event.job.retrycount";
+
+ /** The property to for setting the maximum number of retries. Value is of type Integer. */
+ public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
+
+ /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
+ public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
+
+ /** The property to set to put the jobs into a separate job queue. This property
+ * spcifies the name of the job queue. If the job queue does not exists yet
+ * a new queue is created.
+ * If a job queue is used, the jobs are never executed in parallel from this queue!
+ */
+ public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
+
+ /** If this property is set with any value, the queue processes the jobs in the same
+ * order as they have arrived.
+ * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified.
+ */
+ public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
+
+ /** The topic for jobs. */
+ public static final String TOPIC_JOB = "org/apache/sling/event/job";
+
+ /**
+ * Timed Events
+ */
+
+ /** The topic for timed events. */
+ public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
+
+ /** The real topic of the event. */
+ public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
+
+ /** The property for the unique event id. */
+ public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
+
+ /** The scheduler expression for the timed event. */
+ public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
+
+ /** The period for the timed event. */
+ public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
+
+ /** The date for the timed event. */
+ public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
+
+ /**
+ * Utility Methods
+ */
+
+ /**
+ * Create a distributable event.
+ * A distributable event is distributed across the cluster.
+ * @param topic
+ * @param properties
+ * @return An OSGi event.
+ */
+ public static Event createDistributableEvent(String topic,
+ Dictionary<String, Object> properties) {
+ final Dictionary<String, Object> newProperties;
+ // create a new dictionary
+ newProperties = new Hashtable<String, Object>();
+ if ( properties != null ) {
+ final Enumeration<String> e = properties.keys();
+ while ( e.hasMoreElements() ) {
+ final String key = e.nextElement();
+ newProperties.put(key, properties.get(key));
+ }
+ }
+ // for now the value has no meaning, so we just put an empty string in it.
+ newProperties.put(PROPERTY_DISTRIBUTE, "");
+ return new Event(topic, newProperties);
+ }
+
+ /**
+ * Should this event be distributed in the cluster?
+ * @param event
+ * @return <code>true</code> if the event should be distributed.
+ */
+ public static boolean shouldDistribute(Event event) {
+ return event.getProperty(PROPERTY_DISTRIBUTE) != null;
+ }
+
+ /**
+ * Is this a local event?
+ * @param event
+ * @return <code>true</code> if this is a local event
+ */
+ public static boolean isLocal(Event event) {
+ final String appId = getApplicationId(event);
+ return appId == null || appId.equals(AbstractRepositoryEventHandler.APPLICATION_ID);
+ }
+
+ /**
+ * Return the application id the event was created at.
+ * @param event
+ * @return The application id or null if the event has been created locally.
+ */
+ public static String getApplicationId(Event event) {
+ return (String)event.getProperty(PROPERTY_APPLICATION);
+ }
+
+ /**
+ * Is this a job event?
+ * This method checks if the event contains the {@link #PROPERTY_JOB_TOPIC}
+ * property.
+ * @param event The event to check.
+ * @return <code>true></code> if this is a job event.
+ */
+ public static boolean isJobEvent(Event event) {
+ return event.getProperty(PROPERTY_JOB_TOPIC) != null;
+ }
+
+ /**
+ * Check if this a job event and return the notifier context.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+ // check if this is a job event
+ if ( !isJobEvent(job) ) {
+ return null;
+ }
+ final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+ if ( ctx == null ) {
+ throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+ }
+ return ctx;
+ }
+
+ /**
+ * Notify a finished job.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static void finishedJob(Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
+ }
+ }
+
+ /**
+ * Notify a failed job.
+ * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static boolean rescheduleJob(Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+ }
+ return false;
+ }
+
+ /**
+ * Process a job in the background and notify its success.
+ * This method also sends an acknowledge message to the job event handler.
+ */
+ public static void processJob(final Event job, final JobProcessor processor) {
+ // first check for a notifier context to send an acknowledge
+ boolean notify = true;
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
+ // if we don't get an ack, someone else is already processing this job.
+ // we process but do not notify the job event handler.
+ LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job);
+ notify = false;
+ }
+ }
+ final boolean notifyResult = notify;
+
+ final Runnable task = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ boolean result = false;
+ try {
+ result = processor.process(job);
+ } catch (Throwable t) {
+ LoggerFactory.getLogger(EventUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+ // we don't reschedule if an exception occurs
+ result = true;
+ } finally {
+ if ( notifyResult ) {
+ if ( result ) {
+ EventUtil.finishedJob(job);
+ } else {
+ EventUtil.rescheduleJob(job);
+ }
+ }
+ }
+ }
+
+ };
+ // check if the job handler thread pool is available
+ if ( JobEventHandler.JOB_THREAD_POOL != null ) {
+ JobEventHandler.JOB_THREAD_POOL.execute(task);
+ } else {
+ // if we don't have a thread pool, we create the thread directly
+ // (this should never happen for jobs, but is a safe fallback and
+ // allows to call this method for other background processing.
+ new Thread(task).start();
+ }
+ }
+
+ /**
+ * This is a private interface which is only public for import reasons.
+ */
+ public static interface JobStatusNotifier {
+
+ String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
+
+ public static final class NotifierContext {
+ public final JobStatusNotifier notifier;
+ public final String eventNodePath;
+
+ public NotifierContext(JobStatusNotifier n, String path) {
+ this.notifier = n;
+ this.eventNodePath = path;
+ }
+ }
+
+ /**
+ * Send an acknowledge message that someone is processing the job.
+ * @param job The job.
+ * @param eventNodePath The storage node in the repository.
+ * @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
+ * someone else already send an ack for this job.
+ */
+ boolean sendAcknowledge(Event job, String eventNodePath);
+
+ /**
+ * Notify that the job is finished.
+ * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+ * during the processing. If the job should be rescheduled, <code>true</code> indicates
+ * that the job could be rescheduled. If an error occurs or the number of retries is
+ * exceeded, <code>false</code> will be returned.
+ * @param job The job.
+ * @param eventNodePath The storage node in the repository.
+ * @param reschedule Should the event be rescheduled?
+ * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+ */
+ boolean finishedJob(Event job, String eventNodePath, boolean reschedule);
+ }
+
+ /**
+ * Add all java properties as properties to the node.
+ * If the name and the value of a map entry can easily converted into
+ * a repository property, it is directly added. All other java
+ * properties are stored in one binary property.
+ *
+ * @param node The node where all properties are added to
+ * @param properties The map of properties.
+ * @param ignoreProps optional list of property which should be ignored
+ * @param binPropertyName The name of the binary property.
+ * @throws RepositoryException
+ */
+ public static void addProperties(final Node node,
+ final Map<String, Object> properties,
+ final String[] ignoreProps,
+ final String binPropertyName)
+ throws RepositoryException {
+ addProperties(node, new EventPropertiesMap(properties), ignoreProps, binPropertyName);
+ }
+
+ /**
+ * Add all java properties as properties to the node.
+ * If the name and the value of a map entry can easily converted into
+ * a repository property, it is directly added. All other java
+ * properties are stored in one binary property.
+ *
+ * @param node The node where all properties are added to
+ * @param properties The map of properties.
+ * @param ignoreProps optional list of property which should be ignored
+ * @param binPropertyName The name of the binary property.
+ * @throws RepositoryException
+ */
+ public static void addProperties(final Node node,
+ final EventPropertiesMap properties,
+ final String[] ignoreProps,
+ final String binPropertyName)
+ throws RepositoryException {
+ if ( properties != null ) {
+ final List<String> ignorePropList = (ignoreProps == null ? null : Arrays.asList(ignoreProps));
+ // check which props we can write directly and
+ // which we need to write as a binary blob
+ final List<String> propsAsBlob = new ArrayList<String>();
+
+ final Iterator<Map.Entry<String, Object>> i = properties.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, Object> current = i.next();
+
+ if (ignorePropList == null || !ignorePropList.contains(current.getKey()) ) {
+ // sanity check
+ if ( current.getValue() != null ) {
+ if ( !setProperty(current.getKey(), current.getValue(), node) ) {
+ propsAsBlob.add(current.getKey());
+ }
+ }
+ }
+ }
+ // write the remaining properties as a blob
+ if ( propsAsBlob.size() > 0 ) {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeInt(propsAsBlob.size());
+ for(final String propName : propsAsBlob) {
+ oos.writeObject(propName);
+ try {
+ oos.writeObject(properties.get(propName));
+ } catch (IOException ioe) {
+ throw new RepositoryException("Unable to serialize property " + propName, ioe);
+ }
+ }
+ oos.close();
+ node.setProperty(binPropertyName, new ByteArrayInputStream(baos.toByteArray()));
+ } catch (IOException ioe) {
+ throw new RepositoryException("Unable to serialize properties " + properties, ioe);
+ }
+ }
+ }
+ }
+
+ /**
+ * Read properties from a repository node and create a property map.
+ * @throws RepositoryException
+ * @throws ClassNotFoundException
+ */
+ public static EventPropertiesMap readProperties(final Node node,
+ final String binPropertyName,
+ final String[] ignorePrefixes)
+ throws RepositoryException, ClassNotFoundException {
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ // check the properties blob
+ if ( node.hasProperty(binPropertyName) ) {
+ try {
+ final ObjectInputStream ois = new ObjectInputStream(node.getProperty(binPropertyName).getStream());
+ int length = ois.readInt();
+ for(int i=0;i<length;i++) {
+ final String key = (String)ois.readObject();
+ final Object value = ois.readObject();
+ properties.put(key, value);
+ }
+ } catch (java.io.InvalidClassException ice) {
+ throw new ClassNotFoundException("Found invalid class.", ice);
+ } catch (IOException ioe) {
+ throw new RepositoryException("Unable to deserialize event properties.", ioe);
+ }
+ }
+ // now all properties that have been set directly
+ final PropertyIterator pI = node.getProperties();
+ while ( pI.hasNext() ) {
+ final Property p = pI.nextProperty();
+ boolean ignore = p.getName().startsWith("jcr:");
+ if ( !ignore && ignorePrefixes != null ) {
+ int index = 0;
+ while ( !ignore && index < ignorePrefixes.length ) {
+ ignore = p.getName().startsWith(ignorePrefixes[index]);
+ index++;
+ }
+ }
+ if ( !ignore ) {
+ final String name = ISO9075.decode(p.getName());
+ if ( p.getDefinition().isMultiple() ) {
+ final Value[] values = p.getValues();
+ if ( values.length > 0 ) {
+ // get first value
+ final Object firstObject = getPropertyValue(values[0]);
+ final Object[] array;
+ if ( firstObject instanceof Boolean ) {
+ array = new Boolean[values.length];
+ } else if ( firstObject instanceof Calendar ) {
+ array = new Calendar[values.length];
+ } else if ( firstObject instanceof Double ) {
+ array = new Double[values.length];
+ } else if ( firstObject instanceof Long ) {
+ array = new Long[values.length];
+ } else {
+ array = new String[values.length];
+ }
+ array[0] = firstObject;
+ int index = 1;
+ while ( index < values.length ) {
+ array[index] = getPropertyValue(values[index]);
+ index++;
+ }
+ properties.put(name, array);
+ }
+ } else {
+ final Value value = p.getValue();
+ final Object o = getPropertyValue(value);
+ properties.put(name, o);
+ }
+ }
+ }
+ return new EventPropertiesMap(properties);
+ }
+
+ /**
+ * Return the converted repository property name
+ * @param name The java object property name
+ * @return The converted name or null if not possible.
+ */
+ public static String getNodePropertyName(final String name) {
+ // if name contains a colon, we can't set it as a property
+ if ( name.indexOf(':') != -1 ) {
+ return null;
+ }
+ return ISO9075.encode(name);
+ }
+
+ /**
+ * Return the converted repository property value
+ * @param valueFactory The value factory
+ * @param eventValue The event value
+ * @return The converted value or null if not possible
+ */
+ public static Value getNodePropertyValue(final ValueFactory valueFactory, final Object eventValue) {
+ final Value val;
+ if (eventValue instanceof Calendar) {
+ val = valueFactory.createValue((Calendar)eventValue);
+ } else if (eventValue instanceof Long) {
+ val = valueFactory.createValue((Long)eventValue);
+ } else if (eventValue instanceof Double) {
+ val = valueFactory.createValue(((Double)eventValue).doubleValue());
+ } else if (eventValue instanceof Boolean) {
+ val = valueFactory.createValue((Boolean) eventValue);
+ } else if (eventValue instanceof String) {
+ val = valueFactory.createValue((String)eventValue);
+ } else {
+ val = null;
+ }
+ return val;
+ }
+
+ /**
+ * Convert the value back to an object.
+ * @param value
+ * @return
+ * @throws RepositoryException
+ */
+ private static Object getPropertyValue(final Value value)
+ throws RepositoryException {
+ final Object o;
+ switch (value.getType()) {
+ case PropertyType.BOOLEAN:
+ o = value.getBoolean(); break;
+ case PropertyType.DATE:
+ o = value.getDate(); break;
+ case PropertyType.DOUBLE:
+ o = value.getDouble(); break;
+ case PropertyType.LONG:
+ o = value.getLong(); break;
+ case PropertyType.STRING:
+ o = value.getString(); break;
+ default: // this should never happen - we convert to a string...
+ o = value.getString();
+ }
+ return o;
+ }
+
+ /**
+ * Try to set the java property as a property of the node.
+ * @param name
+ * @param value
+ * @param node
+ * @return
+ * @throws RepositoryException
+ */
+ private static boolean setProperty(String name, Object value, Node node)
+ throws RepositoryException {
+ final String propName = getNodePropertyName(name);
+ if ( propName == null ) {
+ return false;
+ }
+ final ValueFactory fac = node.getSession().getValueFactory();
+ // check for multi value
+ if ( value.getClass().isArray() ) {
+ final Object[] array = (Object[])value;
+ // now we try to convert each value
+ // and check if all converted values have the same type
+ final Value[] values = new Value[array.length];
+ int index = 0;
+ for(final Object v : array ) {
+ values[index] = getNodePropertyValue(fac, v);
+ if ( values[index] == null ) {
+ return false;
+ }
+ if ( index > 0 && !values[index-1].getClass().equals(values[index].getClass()) ) {
+ return false;
+ }
+ index++;
+ }
+ node.setProperty(propName, values);
+ return true;
+ }
+ final Value val = getNodePropertyValue(fac, value);
+ if ( val != null ) {
+ node.setProperty(propName, val);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Improved toString method for an Event.
+ * This method prints out the event topic and all of the properties.
+ */
+ public static String toString(final Event e) {
+ if ( e == null ) {
+ return "<null>";
+ }
+ final StringBuffer buffer =new StringBuffer(e.getClass().getName());
+ buffer.append(" [topic=");
+ buffer.append(e.getTopic());
+ buffer.append(", properties=");
+ final String[] names = e.getPropertyNames();
+ if ( names != null ) {
+ for(int i=0;i<names.length;i++) {
+ if ( i>0) {
+ buffer.append(",");
+ }
+ buffer.append(names[i]);
+ buffer.append('=');
+ buffer.append(e.getProperty(names[i]));
+ }
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/JobProcessor.java b/src/main/java/org/apache/sling/event/JobProcessor.java
new file mode 100644
index 0000000..4f410bc
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/JobProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import org.osgi.service.event.Event;
+
+/**
+ * A job processor processes a job in the background.
+ * It is used by {@link EventUtil#processJob(Event, JobProcessor)}.
+ */
+public interface JobProcessor {
+
+ /**
+ * Execute the job.
+ * If the job fails with a thrown exception/throwable, the process will not be rescheduled.
+ *
+ * @param job The event containing the job description.
+ * @return True if the job could be finished (either successful or by an error).
+ * Return false if the job should be rescheduled.
+ */
+ boolean process(Event job);
+}
diff --git a/src/main/java/org/apache/sling/event/JobStatusProvider.java b/src/main/java/org/apache/sling/event/JobStatusProvider.java
new file mode 100644
index 0000000..e830323
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/JobStatusProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This service provides the current job processing status.
+ */
+public interface JobStatusProvider {
+
+ /**
+ * This is a unique identifer which can be used to cancel the job.
+ */
+ String PROPERTY_EVENT_ID = "slingevent:eventId";
+
+ /**
+ * @deprecated Use {@link #getScheduledJobs(String)} instead.
+ */
+ @Deprecated
+ Collection<Event> scheduledJobs(String topic);
+
+ /**
+ * Return a list of currently schedulded jobs.
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @return A non null collection.
+ */
+ Collection<Event> getScheduledJobs(String topic);
+
+ /**
+ * Return the jobs which are currently in processing. If there are several application nodes
+ * in the cluster, there could be more than one job in processing
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @return A non null collection.
+ */
+ Collection<Event> getCurrentJobs(String topic);
+
+ /**
+ * Return a list of currently schedulded jobs.
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+ * must match the template (AND query). By providing several maps, different filters
+ * are possible (OR query).
+ * @return A non null collection.
+ */
+ Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps);
+
+ /**
+ * Return the jobs which are currently in processing. If there are several application nodes
+ * in the cluster, there could be more than one job in processing
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+ * must match the template (AND query). By providing several maps, different filters
+ * are possible (OR query).
+ * @return A non null collection.
+ */
+ Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps);
+
+ /**
+ * Return all jobs either running or scheduled.
+ * This is actually a convenience method and collects the results from {@link #getScheduledJobs(String, Map...)}
+ * and {@link #getCurrentJobs(String, Map...)}
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+ * must match the template (AND query). By providing several maps, different filters
+ * are possible (OR query).
+ * @return A non null collection.
+ */
+ Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps);
+
+ /**
+ * Cancel this job.
+ * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+ */
+ void cancelJob(String jobId);
+
+ /**
+ * Cancel this job.
+ * This method can be used if the topic and the provided job id is known.
+ * @param topic The job topic as put into the property {@link EventUtil#PROPERTY_JOB_TOPIC}.
+ * @param jobId The unique identifer as put into the property {@link EventUtil#PROPERTY_JOB_ID}.
+ */
+ void cancelJob(String topic, String jobId);
+
+ /**
+ * Wake up the named job queue.
+ * If a job failed, the job queue waits (sleeps) for a configured time. By calling this
+ * method, the job queue can be woken up and force an immediate reprocessing.
+ * @param jobQueueName The name of the queue.
+ */
+ void wakeUpJobQueue(final String jobQueueName);
+}
diff --git a/src/main/java/org/apache/sling/event/ThreadPool.java b/src/main/java/org/apache/sling/event/ThreadPool.java
new file mode 100644
index 0000000..1cca2c2
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/ThreadPool.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.event;
+
+/**
+ * The eventing thread pool is a special thread pool used for the eventing.
+ * The eventing uses a service registered as this interface.
+ * The default implementation is a configurable pool registered with
+ * commons threads.
+ *
+ * @version $Id$
+ */
+public interface ThreadPool extends org.apache.sling.commons.threads.ThreadPool {
+
+ // this is just a marker interface
+}
diff --git a/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java b/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
new file mode 100644
index 0000000..ab931ae
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This service provides the current timed events status.
+ */
+public interface TimedEventStatusProvider {
+
+ /**
+ * This is a unique identifer which can be used to cancel the job.
+ */
+ String PROPERTY_EVENT_ID = "slingevent:eventId";
+
+ /**
+ * Return a list of currently schedulded events.
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @param filterProps A list of filter property maps. Each map acts like a template. The searched event
+ * must match the template (AND query). By providing several maps, different filters
+ * are possible (OR query).
+ * @return A non null collection.
+ */
+ Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps);
+
+ /**
+ * Return the scheduled event with the given id.
+ * @return The scheduled event or null.
+ */
+ Event getScheduledEvent(String topic, String eventId, String jobId);
+
+ /**
+ * Cancel this timed event.
+ * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+ */
+ void cancelTimedEvent(String jobId);
+}
diff --git a/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java b/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
new file mode 100644
index 0000000..d754584
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListener;
+
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.engine.SlingSettingsService;
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.JobStatusProvider;
+import org.apache.sling.event.ThreadPool;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.jcr.resource.JcrResourceUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for all event handlers in this package.
+ *
+ * @scr.component abstract="true" metatype="no"
+ * @scr.service interface="org.osgi.service.event.EventHandler"
+ */
+public abstract class AbstractRepositoryEventHandler
+ implements EventHandler, EventListener {
+
+ /** Default log. */
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** @scr.property valueRef="DEFAULT_PROPERTY_REPO_PATH" */
+ protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.path";
+
+ /** Default path for the {@link #CONFIG_PROPERTY_REPO_PATH} */
+ private static final String DEFAULT_PROPERTY_REPO_PATH = "/sling/events";
+
+ /** @scr.reference */
+ protected SlingRepository repository;
+
+ /** @scr.reference */
+ protected EventAdmin eventAdmin;
+
+ /** Our application id. */
+ protected String applicationId;
+
+ /** The repository session to write into the repository. */
+ protected Session writerSession;
+
+ /** The path in the repository. */
+ protected String repositoryPath;
+
+ /** Is the background task still running? */
+ protected boolean running;
+
+ /** A local queue for serialising the event processing. */
+ protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
+
+ /** A local queue for writing received events into the repository. */
+ protected final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+ /**
+ * Our thread pool.
+ * @scr.reference */
+ protected ThreadPool threadPool;
+
+ /** @scr.reference
+ * Sling settings service. */
+ protected SlingSettingsService settingsService;
+
+ public static String APPLICATION_ID;
+
+ /** List of ignored properties to write to the repository. */
+ private static final String[] IGNORE_PROPERTIES = new String[] {
+ EventUtil.PROPERTY_DISTRIBUTE,
+ EventUtil.PROPERTY_APPLICATION,
+ JobStatusProvider.PROPERTY_EVENT_ID,
+ EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME
+ };
+
+ /** List of ignored prefixes to read from the repository. */
+ private static final String[] IGNORE_PREFIXES = new String[] {
+ EventHelper.EVENT_PREFIX
+ };
+
+ /**
+ * Activate this component.
+ * @param context
+ * @throws RepositoryException
+ */
+ protected void activate(final ComponentContext context)
+ throws Exception {
+ this.applicationId = this.settingsService.getSlingId();
+ APPLICATION_ID = this.applicationId;
+ this.repositoryPath = OsgiUtil.toString(context.getProperties().get(
+ CONFIG_PROPERTY_REPO_PATH), DEFAULT_PROPERTY_REPO_PATH);
+
+ this.running = true;
+ // start writer thread
+ this.threadPool.execute(new Runnable() {
+ public void run() {
+ try {
+ startWriterSession();
+ } catch (RepositoryException e) {
+ // there is nothing we can do except log!
+ logger.error("Error during session starting.", e);
+ running = false;
+ }
+ try {
+ processWriteQueue();
+ } catch (Throwable t) {
+ logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ stopWriterSession();
+ }
+ });
+ this.threadPool.execute(new Runnable() {
+ public void run() {
+ try {
+ runInBackground();
+ } catch (Throwable t) {
+ logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ }
+ });
+ }
+
+ protected abstract void runInBackground() throws RepositoryException;
+
+ protected abstract void processWriteQueue();
+
+ /**
+ * Deactivate this component.
+ * @param context
+ */
+ protected void deactivate(final ComponentContext context) {
+ // stop background threads by putting empty objects into the queue
+ this.running = false;
+ try {
+ this.writeQueue.put(new Event("some", null));
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ try {
+ this.queue.put(new EventInfo());
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+
+ /**
+ * Create a new session.
+ * @return
+ * @throws RepositoryException
+ */
+ protected Session createSession()
+ throws RepositoryException {
+ final SlingRepository repo = this.repository;
+ if ( repo == null ) {
+ throw new RepositoryException("Repository is currently not available.");
+ }
+ return repo.loginAdministrative(null);
+ }
+
+ /**
+ * Start the repository session and add this handler as an observer
+ * for new events created on other nodes.
+ * @throws RepositoryException
+ */
+ protected void startWriterSession() throws RepositoryException {
+ this.writerSession = this.createSession();
+ if ( this.repositoryPath != null ) {
+ this.ensureRepositoryPath();
+ }
+ }
+
+ /**
+ * Stop the session.
+ */
+ protected void stopWriterSession() {
+ if ( this.writerSession != null ) {
+ try {
+ this.writerSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
+ this.writerSession.logout();
+ this.writerSession = null;
+ }
+ }
+
+ /**
+ * Check if the repository path already exists. If not, create it.
+ */
+ protected Node ensureRepositoryPath()
+ throws RepositoryException {
+ final Node node = JcrResourceUtil.createPath(this.repositoryPath,
+ EventHelper.NODETYPE_FOLDER,
+ EventHelper.NODETYPE_ORDERED_FOLDER,
+ this.writerSession, true);
+
+ return node;
+ }
+
+ /**
+ * Return the node type for the event.
+ */
+ protected String getEventNodeType() {
+ return EventHelper.EVENT_NODE_TYPE;
+ }
+
+ /**
+ * Write an event to the repository.
+ * @param e The event
+ * @param suggestName A suggest name/path for the node.
+ * @throws RepositoryException
+ * @throws IOException
+ */
+ protected Node writeEvent(Event e, String suggestedName)
+ throws RepositoryException {
+ // create new node with name of topic
+ final Node rootNode = this.ensureRepositoryPath();
+
+ final String nodeType = this.getEventNodeType();
+ final String nodeName;
+ if ( suggestedName != null ) {
+ nodeName = suggestedName;
+ } else {
+ final Calendar now = Calendar.getInstance();
+ final int sepPos = nodeType.indexOf(':');
+ nodeName = nodeType.substring(sepPos+1) + "-" + this.applicationId + "-" + now.getTime().getTime();
+ }
+ final Node eventNode = JcrResourceUtil.createPath(rootNode,
+ nodeName,
+ EventHelper.NODETYPE_FOLDER,
+ nodeType, false);
+
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_CREATED, Calendar.getInstance());
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, e.getTopic());
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_APPLICATION, this.applicationId);
+
+ EventUtil.addProperties(eventNode,
+ new EventPropertiesMap(e),
+ IGNORE_PROPERTIES,
+ EventHelper.NODE_PROPERTY_PROPERTIES);
+ this.addNodeProperties(eventNode, e);
+ rootNode.save();
+
+ return eventNode;
+ }
+
+ /**
+ * Read an event from the repository.
+ * @return
+ * @throws RepositoryException
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ protected Event readEvent(Node eventNode)
+ throws RepositoryException, ClassNotFoundException {
+ final String topic = eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString();
+ final EventPropertiesMap eventProps = EventUtil.readProperties(eventNode,
+ EventHelper.NODE_PROPERTY_PROPERTIES,
+ IGNORE_PREFIXES);
+
+ eventProps.put(JobStatusProvider.PROPERTY_EVENT_ID, eventNode.getPath());
+ this.addEventProperties(eventNode, eventProps);
+ try {
+ final Event event = new Event(topic, eventProps);
+ return event;
+ } catch (IllegalArgumentException iae) {
+ // this exception occurs if the topic is not correct (it should never happen,
+ // but you never know)
+ throw new RepositoryException("Unable to read event: " + iae.getMessage(), iae);
+ }
+ }
+
+ /**
+ * Add properties from the node to the event properties.
+ * @param eventNode The repository node.
+ * @param properties The event properties.
+ * @throws RepositoryException
+ */
+ protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+ throws RepositoryException {
+ // nothing to do
+ }
+
+ /**
+ * Add properties when storing event in repository.
+ * This method can be enhanced by sub classes.
+ * @param eventNode
+ * @param event
+ * @throws RepositoryException
+ */
+ protected void addNodeProperties(Node eventNode, Event event)
+ throws RepositoryException {
+ // nothing to do here
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ protected void ignoreException(Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignore exception " + e.getMessage(), e);
+ }
+ }
+
+ protected static final class EventInfo {
+ public String nodePath;
+ public Event event;
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java b/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
new file mode 100644
index 0000000..4e07571
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This event handler distributes events across an application cluster.
+ * @scr.component inherit="true" label="%dist.events.name" description="%dist.events.description"
+ * @scr.property name="event.topics" value="*" private="true"
+ * @scr.property name="event.filter" value="(event.distribute=*)" private="true"
+ * @scr.property name="repository.path" value="/var/eventing/distribution" private="true"
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ * @scr.service interface="java.lang.Runnable"
+ * @scr.property name="scheduler.period" value="1800" type="Long"
+ * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
+ */
+public class DistributingEventHandler
+ extends AbstractRepositoryEventHandler
+ implements Runnable {
+
+ /** Default clean up time is 15 minutes. */
+ protected static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+ /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
+ protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** We remove everything which is older than 15min by default. */
+ protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext)
+ */
+ protected void activate(ComponentContext context)
+ throws Exception {
+ @SuppressWarnings("unchecked")
+ final Dictionary<String, Object> props = context.getProperties();
+ this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ super.activate(context);
+ }
+
+ /**
+ * Return the query string for the clean up.
+ */
+ protected String getCleanUpQueryString() {
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+ final String dateString = ISO8601.format(deleteBefore);
+
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(getEventNodeType());
+ buffer.append(")[@");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" < xs:dateTime('");
+ buffer.append(dateString);
+ buffer.append("')]");
+
+ return buffer.toString();
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ if ( this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+ final String queryString = this.getCleanUpQueryString();
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.createSession();
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ logger.debug("Executing query {}", queryString);
+ final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ parentNode.save();
+ logger.debug("Removed {} entries from the repository.", count);
+
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.", e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( event != null && this.running ) {
+ try {
+ this.writeEvent(event, null);
+ } catch (Exception e) {
+ this.logger.error("Exception during writing the event to the repository.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
+ protected void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( info != null && this.running ) {
+ if ( info.nodePath != null) {
+ Session session = null;
+ try {
+ session = this.createSession();
+ final Node eventNode = (Node)session.getItem(info.nodePath);
+ final EventAdmin localEA = this.eventAdmin;
+ if ( localEA != null ) {
+ localEA.postEvent(this.readEvent(eventNode));
+ } else {
+ this.logger.error("Unable to post event as no event admin is available.");
+ }
+ } catch (Exception ex) {
+ this.logger.error("Exception during reading the event from the repository.", ex);
+ } finally {
+ if ( session != null ) {
+ session.logout();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(final Event event) {
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ }
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(final EventIterator iterator) {
+ while ( iterator.hasNext() ) {
+ final javax.jcr.observation.Event event = iterator.nextEvent();
+ try {
+ final EventInfo info = new EventInfo();
+ info.nodePath = event.getPath();
+ this.queue.put(info);
+ } catch (InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ } catch (RepositoryException ex) {
+ this.logger.error("Exception during reading the event from the repository.", ex);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+ */
+ protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+ throws RepositoryException {
+ super.addEventProperties(eventNode, properties);
+ properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+ }
+
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+ */
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/EventHelper.java b/src/main/java/org/apache/sling/event/impl/EventHelper.java
new file mode 100644
index 0000000..65a386b
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/EventHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+
+/**
+ * Helper class defining some constants and utility methods.
+ */
+public abstract class EventHelper {
+
+ /** The name of the thread pool for the eventing stuff. */
+ public static final String THREAD_POOL_NAME = "SLING_EVENTING";
+
+ /** The namespace prefix. */
+ public static final String EVENT_PREFIX = "slingevent:";
+
+ public static final String NODE_PROPERTY_TOPIC = "slingevent:topic";
+ public static final String NODE_PROPERTY_APPLICATION = "slingevent:application";
+ public static final String NODE_PROPERTY_CREATED = "slingevent:created";
+ public static final String NODE_PROPERTY_PROPERTIES = "slingevent:properties";
+ public static final String NODE_PROPERTY_PROCESSOR = "slingevent:processor";
+ public static final String NODE_PROPERTY_JOBID = "slingevent:id";
+ public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
+ public static final String NODE_PROPERTY_TE_EXPRESSION = "slingevent:expression";
+ public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
+ public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
+
+ public static final String EVENT_NODE_TYPE = "slingevent:Event";
+ public static final String JOB_NODE_TYPE = "slingevent:Job";
+ public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
+
+ /** The nodetype for newly created intermediate folders */
+ public static final String NODETYPE_FOLDER = "sling:Folder";
+
+ /** The nodetype for newly created folders */
+ public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
+
+ /** Allowed characters for a node name */
+ private static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!¤$%&()=[]?";
+ /** Replacement characters for unallowed characters in a node name */
+ private static final char REPLACEMENT_CHAR = '_';
+
+ /**
+ * Filter the node name for not allowed characters and replace them.
+ * @param nodeName The suggested node name.
+ * @return The filtered node name.
+ */
+ public static String filter(final String nodeName) {
+ final StringBuffer sb = new StringBuffer();
+ char lastAdded = 0;
+
+ for(int i=0; i < nodeName.length(); i++) {
+ final char c = nodeName.charAt(i);
+ char toAdd = c;
+
+ if (ALLOWED_CHARS.indexOf(c) < 0) {
+ if (lastAdded == REPLACEMENT_CHAR) {
+ // do not add several _ in a row
+ continue;
+ }
+ toAdd = REPLACEMENT_CHAR;
+
+ } else if(i == 0 && Character.isDigit(c)) {
+ sb.append(REPLACEMENT_CHAR);
+ }
+
+ sb.append(toAdd);
+ lastAdded = toAdd;
+ }
+
+ if (sb.length()==0) {
+ sb.append(REPLACEMENT_CHAR);
+ }
+
+ return sb.toString();
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
new file mode 100644
index 0000000..ea9f91a
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.ThreadPool;
+import org.osgi.service.component.ComponentContext;
+
+
+/**
+ * The configurable eventing thread pool.
+ * @scr.component label="%event.pool.name" description="%event.pool.description"
+ * @scr.service interface="org.apache.sling.event.ThreadPool"
+ *
+ * @scr.property nameRef="PROPERTY_MIN_POOL_SIZE" valueRef="DEFAULT_MIN_POOL_SIZE"
+ * @scr.property nameRef="PROPERTY_MAX_POOL_SIZE" valueRef="DEFAULT_MAX_POOL_SIZE"
+ * @scr.property nameRef="PROPERTY_QUEUEL_SIZE" valueRef="DEFAULT_QUEUE_SIZE"
+ */
+public class EventingThreadPool implements ThreadPool {
+
+ /** @scr.reference */
+ protected ThreadPoolManager threadPoolManager;
+
+ /** The real thread pool used. */
+ private org.apache.sling.commons.threads.ThreadPool threadPool;
+
+ private static final String PROPERTY_MIN_POOL_SIZE = "minPoolSize";
+ private static final String PROPERTY_MAX_POOL_SIZE = "maxPoolSize";
+ private static final String PROPERTY_QUEUEL_SIZE = "queueSize";
+
+ private static final int DEFAULT_MIN_POOL_SIZE = 20; // this is sufficient for all threads + approx 10 job queues
+ private static final int DEFAULT_MAX_POOL_SIZE = 30;
+ private static final int DEFAULT_QUEUE_SIZE = 50; // queue upto 50 threads
+
+ /**
+ * Activate this component.
+ * @param context
+ */
+ protected void activate(final ComponentContext ctx) throws Exception {
+ // start background threads
+ if ( this.threadPoolManager == null ) {
+ throw new Exception("No ThreadPoolManager found.");
+ }
+ final ThreadPoolConfig config = new ThreadPoolConfig();
+ config.setMinPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MIN_POOL_SIZE), DEFAULT_MIN_POOL_SIZE));
+ config.setMaxPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MAX_POOL_SIZE), DEFAULT_MAX_POOL_SIZE));
+ config.setQueueSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_QUEUEL_SIZE), DEFAULT_QUEUE_SIZE));
+ config.setShutdownGraceful(true);
+ threadPoolManager.create(EventHelper.THREAD_POOL_NAME, config);
+
+ this.threadPool = threadPoolManager.get(EventHelper.THREAD_POOL_NAME);
+ if ( this.threadPool == null ) {
+ throw new Exception("No thread pool with name " + EventHelper.THREAD_POOL_NAME + " found.");
+ }
+ }
+
+ /**
+ * Deactivate this component.
+ * @param context
+ */
+ protected void deactivate(final ComponentContext context) {
+ this.threadPool = null;
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
+ */
+ public void execute(Runnable runnable) {
+ threadPool.execute(runnable);
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
+ */
+ public ThreadPoolConfig getConfiguration() {
+ return threadPool.getConfiguration();
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#getName()
+ */
+ public String getName() {
+ return threadPool.getName();
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#shutdown()
+ */
+ public void shutdown() {
+ threadPool.shutdown();
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java b/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
new file mode 100644
index 0000000..89a6bc3
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler.EventInfo;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+ private EventInfo eventInfo;
+
+ private final Object lock = new Object();
+
+ private boolean isWaiting = false;
+
+ private boolean markForCleanUp = false;
+
+ private boolean finished = false;
+
+ private boolean isSleeping = false;
+
+ private String schedulerJobName;
+ private Thread sleepingThread;
+
+ public EventInfo waitForFinish() throws InterruptedException {
+ this.isWaiting = true;
+ this.markForCleanUp = false;
+ this.lock.wait();
+ this.isWaiting = false;
+ final EventInfo object = this.eventInfo;
+ this.eventInfo = null;
+ return object;
+ }
+
+ public void markForCleanUp() {
+ if ( !this.isWaiting ) {
+ this.markForCleanUp = true;
+ }
+ }
+
+ public boolean isMarkedForCleanUp() {
+ return !this.isWaiting && this.markForCleanUp;
+ }
+
+ public void notifyFinish(EventInfo i) {
+ this.eventInfo = i;
+ this.lock.notify();
+ }
+
+ public Object getLock() {
+ return lock;
+ }
+
+ public boolean isWaiting() {
+ return this.isWaiting;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean flag) {
+ this.finished = flag;
+ }
+
+ public void setSleeping(boolean flag) {
+ this.isSleeping = flag;
+ if ( !flag ) {
+ this.schedulerJobName = null;
+ this.sleepingThread = null;
+ }
+ }
+
+ public void setSleeping(boolean flag, String schedulerJobName) {
+ this.schedulerJobName = schedulerJobName;
+ this.setSleeping(flag);
+ }
+
+ public void setSleeping(boolean flag, Thread sleepingThread) {
+ this.sleepingThread = sleepingThread;
+ this.setSleeping(flag);
+ }
+
+ public String getSchedulerJobName() {
+ return this.schedulerJobName;
+ }
+
+ public Thread getSleepingThread() {
+ return this.sleepingThread;
+ }
+
+ public boolean isSleeping() {
+ return this.isSleeping;
+ }
+}
+
diff --git a/src/main/java/org/apache/sling/event/impl/JobEventHandler.java b/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
new file mode 100644
index 0000000..4a9b187
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
@@ -0,0 +1,1510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+
+import javax.jcr.Item;
+import javax.jcr.ItemExistsException;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.JobStatusProvider;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.ComponentConstants;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * @scr.component label="%job.events.name" description="%job.events.description" immediate="true"
+ * @scr.service interface="org.apache.sling.event.JobStatusProvider"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
+ * values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ * values.started="org/osgi/framework/BundleEvent/STARTED"
+ * private="true"
+ * @scr.property name="repository.path" value="/var/eventing/jobs" private="true"
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ * @scr.service interface="java.lang.Runnable"
+ * @scr.property name="scheduler.period" value="300" type="Long" label="%jobscheduler.period.name" description="%jobscheduler.period.description"
+ * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
+ */
+public class JobEventHandler
+ extends AbstractRepositoryEventHandler
+ implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
+
+ /** A map for keeping track of currently processed job topics. */
+ private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+ /** A map for the different job queues. */
+ private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>();
+
+ /** Default sleep time. */
+ private static final long DEFAULT_SLEEP_TIME = 30;
+
+ /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
+ private static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+
+ /** Default number of job retries. */
+ private static final int DEFAULT_MAX_JOB_RETRIES = 10;
+
+ /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
+ private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
+
+ /** Default number of seconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs
+
+ /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */
+ private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs";
+
+ /** Default nubmer of parallel jobs. */
+ private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+
+ /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
+ private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+
+ /** We check every 30 secs by default. */
+ private long sleepTime;
+
+ /** How often should a job be retried by default. */
+ private int maxJobRetries;
+
+ /** How long do we wait for an ack (in ms) */
+ private long waitForAckMs;
+
+ /** Maximum parallel running jobs for a single queue. */
+ private long maximumParallelJobs;
+
+ /** Background session. */
+ private Session backgroundSession;
+
+ /** Unloaded jobs. */
+ private Set<String>unloadedJobs = new HashSet<String>();
+
+ /** List of deleted jobs. */
+ private Set<String>deletedJobs = new HashSet<String>();
+
+ /** Default clean up time is 5 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 5;
+
+ /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" label="%jobcleanup.period.name" description="%jobcleanup.period.description" */
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** We remove everything which is older than 5 min by default. */
+ private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+ /** The scheduler for rescheduling jobs. @scr.reference */
+ private Scheduler scheduler;
+
+ /** Our component context. */
+ private ComponentContext componentContext;
+
+ /** The map of events we're currently processing. */
+ private final Map<String, StartedJobInfo> processingEventsList = new HashMap<String, StartedJobInfo>();
+
+ public static ThreadPool JOB_THREAD_POOL;
+
+ /** Sync lock */
+ private final Object writeLock = new Object();
+
+ /** Sync lock */
+ private final Object backgroundLock = new Object();
+
+ /** Number of parallel jobs for the main queue. */
+ private long parallelJobCount;
+
+ /**
+ * Activate this component.
+ * @param context
+ * @throws RepositoryException
+ */
+ protected void activate(final ComponentContext context)
+ throws Exception {
+ @SuppressWarnings("unchecked")
+ final Dictionary<String, Object> props = context.getProperties();
+ this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
+ this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
+ this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK) * 1000;
+ this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS), DEFAULT_MAXIMUM_PARALLEL_JOBS);
+ this.componentContext = context;
+ super.activate(context);
+ JOB_THREAD_POOL = this.threadPool;
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
+ */
+ protected void deactivate(final ComponentContext context) {
+ super.deactivate(context);
+ synchronized ( this.jobQueues ) {
+ final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
+ while ( i.hasNext() ) {
+ final JobBlockingQueue jbq = i.next();
+ // wake up qeue
+ if ( jbq.isWaiting() ) {
+ synchronized ( jbq.getLock()) {
+ jbq.notifyFinish(null);
+ }
+ }
+ // continue queue processing
+ try {
+ jbq.put(new EventInfo());
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ }
+ if ( this.backgroundSession != null ) {
+ synchronized ( this.backgroundLock ) {
+ try {
+ this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
+ this.backgroundSession.logout();
+ this.backgroundSession = null;
+ }
+ }
+ this.componentContext = null;
+ JOB_THREAD_POOL = null;
+ }
+
+ /**
+ * Return the query string for the clean up.
+ */
+ private String getCleanUpQueryString() {
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+ final String dateString = ISO8601.format(deleteBefore);
+
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(getEventNodeType());
+ buffer.append(")[@");
+ buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+ buffer.append(" < xs:dateTime('");
+ buffer.append(dateString);
+ buffer.append("')]");
+
+ return buffer.toString();
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ if ( this.running ) {
+ // check for jobs that were started but never got an aknowledge
+ final long tooOld = System.currentTimeMillis() - this.waitForAckMs;
+ // to keep the synchronized block as fast as possible we just store the
+ // jobs to be removed in a new list and process this list afterwards
+ final List<StartedJobInfo> restartJobs = new ArrayList<StartedJobInfo>();
+ synchronized ( this.processingEventsList ) {
+ final Iterator<Map.Entry<String, StartedJobInfo>> i = this.processingEventsList.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, StartedJobInfo> entry = i.next();
+ if ( entry.getValue().started <= tooOld ) {
+ restartJobs.add(entry.getValue());
+ }
+ }
+ }
+ // remove obsolete jobs from the repository
+ if ( this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
+
+ final String queryString = this.getCleanUpQueryString();
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.createSession();
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ logger.debug("Executing query {}", queryString);
+ final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ parentNode.save();
+ logger.debug("Removed {} entries from the repository.", count);
+
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.", e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ // restart jobs is now a list of potential candidates, we now have to check
+ // each candidate separately again!
+ if ( restartJobs.size() > 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // we just ignore this
+ e.printStackTrace();
+ }
+ }
+ final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+ while ( jobIter.hasNext() ) {
+ final StartedJobInfo info = jobIter.next();
+ boolean process = false;
+ synchronized ( this.processingEventsList ) {
+ process = this.processingEventsList.remove(info.nodePath) != null;
+ }
+ if ( process ) {
+ this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath);
+ this.finishedJob(info.event, info.nodePath, true);
+ }
+ }
+
+ // check for idle threads
+ synchronized ( this.jobQueues ) {
+ final Iterator<Map.Entry<String, JobBlockingQueue>> i = this.jobQueues.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, JobBlockingQueue> current = i.next();
+ final JobBlockingQueue jbq = current.getValue();
+ if ( jbq.size() == 0 ) {
+ if ( jbq.isMarkedForCleanUp() ) {
+ // set to finished
+ jbq.setFinished(true);
+ // wake up
+ try {
+ jbq.put(new EventInfo());
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // remove
+ i.remove();
+ } else {
+ // mark to be removed during next cycle
+ jbq.markForCleanUp();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( event != null && this.running ) {
+ logger.debug("Persisting job {}", event);
+ try {
+ this.writerSession.refresh(false);
+ } catch (RepositoryException re) {
+ // we just ignore this
+ this.ignoreException(re);
+ }
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final String nodePath = this.getNodePath(jobTopic, jobId);
+
+ // if the job has no job id, we can just write the job to the repo and don't
+ // need locking
+ if ( jobId == null ) {
+ try {
+ final Node eventNode = this.writeEvent(event, nodePath);
+ info.nodePath = eventNode.getPath();
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+ }
+ } else {
+ try {
+ // let's first search for an existing node with the same id
+ final Node parentNode = this.ensureRepositoryPath();
+ Node foundNode = null;
+ if ( parentNode.hasNode(nodePath) ) {
+ foundNode = parentNode.getNode(nodePath);
+ }
+ if ( foundNode != null ) {
+ // if the node is locked, someone else was quicker
+ // and we don't have to process this job
+ if ( !foundNode.isLocked() ) {
+ // node is already in repository, so if not finished we just use it
+ // otherwise it has already been processed
+ try {
+ if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) {
+ info.nodePath = foundNode.getPath();
+ }
+ } catch (RepositoryException re) {
+ // if anything goes wrong, it means that (hopefully) someone
+ // else is processing this node
+ }
+ }
+ } else {
+ // We now write the event into the repository
+ try {
+ final Node eventNode = this.writeEvent(event, nodePath);
+ info.nodePath = eventNode.getPath();
+ } catch (ItemExistsException iee) {
+ // someone else did already write this node in the meantime
+ // nothing to do for us
+ }
+ }
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job '" + event + "' to repository at " + nodePath, re);
+ }
+ }
+ // if we were able to write the event into the repository
+ // we will queue it for processing
+ if ( info.nodePath != null ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This method runs in the background and processes the local queue.
+ */
+ protected void runInBackground() throws RepositoryException {
+ this.backgroundSession = this.createSession();
+ this.backgroundSession.getWorkspace().getObservationManager()
+ .addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_REMOVED
+ |javax.jcr.observation.Event.NODE_REMOVED,
+ this.repositoryPath,
+ true,
+ null,
+ new String[] {this.getEventNodeType()},
+ true);
+ // give the system some time to start
+ try {
+ Thread.sleep(1000 * 30); // 30 secs
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // load unprocessed jobs from repository
+ if ( this.running ) {
+ this.loadJobs();
+ } else {
+ final ComponentContext ctx = this.componentContext;
+ // deactivate
+ if ( ctx != null ) {
+ logger.info("Deactivating component {} due to errors during startup.", ctx.getProperties().get(Constants.SERVICE_ID));
+ final String name = (String) componentContext.getProperties().get(
+ ComponentConstants.COMPONENT_NAME);
+ ctx.disableComponent(name);
+ }
+ }
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+
+ if ( info != null && this.running ) {
+ // check for local only jobs and remove them from the queue if they're meant
+ // for another application node
+ final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
+ if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
+ && appId != null && !this.applicationId.equals(appId) ) {
+ info = null;
+ }
+
+ // check if we should put this into a separate queue
+ if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
+ synchronized ( this.jobQueues ) {
+ BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+ if ( jobQueue == null ) {
+ final JobBlockingQueue jq = new JobBlockingQueue();
+ jobQueue = jq;
+ this.jobQueues.put(queueName, jq);
+ // Start background thread
+ this.threadPool.execute(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while ( running && !jq.isFinished() ) {
+ logger.info("Starting job queue {}", queueName);
+ try {
+ runJobQueue(queueName, jq);
+ } catch (Throwable t) {
+ logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
+ }
+ }
+
+ });
+ }
+ try {
+ jobQueue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ // don't process this here
+ info = null;
+ }
+
+ // if we still have a job, process it
+ if ( info != null ) {
+ this.executeJob(info, null);
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute a job queue
+ * @param queueName The name of the job queue
+ * @param jobQueue The job queue
+ */
+ private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) {
+ EventInfo info = null;
+ while ( this.running && !jobQueue.isFinished() ) {
+ if ( info == null ) {
+ // so let's wait/get the next job from the queue
+ try {
+ info = jobQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ }
+
+ if ( info != null && this.running && !jobQueue.isFinished() ) {
+ synchronized ( jobQueue.getLock()) {
+ final EventInfo processInfo = info;
+ info = null;
+ if ( this.executeJob(processInfo, jobQueue) ) {
+ EventInfo newInfo = null;
+ try {
+ newInfo = jobQueue.waitForFinish();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // if we have an info, this is a reschedule
+ if ( newInfo != null ) {
+ final EventInfo newEventInfo = newInfo;
+ final Event job = newInfo.event;
+
+ // is this an ordered queue?
+ final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+
+ if ( orderedQueue ) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ jobQueue.setSleeping(true, Thread.currentThread());
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ } finally {
+ jobQueue.setSleeping(false);
+ }
+ }
+ info = newInfo;
+ } else {
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final String schedulerJobName = "Waiting:" + queueName;
+ final Runnable t = new Runnable() {
+ public void run() {
+ jobQueue.setSleeping(true, schedulerJobName);
+ try {
+ jobQueue.put(newEventInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ } finally {
+ jobQueue.setSleeping(false);
+ }
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ try {
+ jobQueue.put(newInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Process a job
+ */
+ private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+ boolean putback = false;
+ boolean wait = false;
+ synchronized (this.backgroundLock) {
+ try {
+ this.backgroundSession.refresh(false);
+ // check if the node still exists
+ if ( this.backgroundSession.itemExists(info.nodePath)
+ && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+ final Event event = info.event;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+ // check how we can process this job
+ // if parallel processing is allowed, we can just process
+ // if not we should check if any other job with the same topic is currently running
+ boolean process = parallelProcessing;
+ if ( !parallelProcessing ) {
+ synchronized ( this.processingMap ) {
+ final Boolean value = this.processingMap.get(jobTopic);
+ if ( value == null || !value.booleanValue() ) {
+ this.processingMap.put(jobTopic, Boolean.TRUE);
+ process = true;
+ }
+ }
+
+ } else {
+ // check number of parallel jobs for main queue
+ if ( jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) {
+ process = false;
+ wait = true;
+ }
+ }
+ if ( process ) {
+ boolean unlock = true;
+ try {
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ try {
+ eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ process = false;
+ }
+ if ( process ) {
+ unlock = false;
+ this.processJob(info.event, eventNode, jobQueue == null);
+ return true;
+ }
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ } finally {
+ if ( unlock && !parallelProcessing ) {
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
+ }
+ }
+ } else {
+ try {
+ // check if the node is in processing or already finished
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ putback = true;
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.ignoreException(re);
+ }
+
+ }
+ // if this is the main queue and we have reached the max number of parallel jobs
+ // we wait a little bit before continuing
+ if ( wait ) {
+ try {
+ Thread.sleep(sleepTime * 1000);
+ } catch (InterruptedException ie) {
+ // ignore
+ ignoreException(ie);
+ }
+ }
+ // if we have to put back the job, we do it now
+ if ( putback ) {
+ final EventInfo eInfo = info;
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+ // we put it back into the queue after a specific time
+ final Runnable r = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ queue.put(eInfo);
+ } catch (InterruptedException e) {
+ // ignore
+ ignoreException(e);
+ }
+ }
+
+ };
+ try {
+ this.scheduler.fireJobAt(null, r, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception
+ ignoreException(e);
+ // then wait for the time and readd the job
+ try {
+ Thread.sleep(sleepTime * 1000);
+ } catch (InterruptedException ie) {
+ // ignore
+ ignoreException(ie);
+ }
+ r.run();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getEventNodeType()
+ */
+ protected String getEventNodeType() {
+ return EventHelper.JOB_NODE_TYPE;
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(final Event event) {
+ logger.debug("Receiving event {}", event);
+ // we ignore remote job events
+ if ( EventUtil.isLocal(event) ) {
+ // check for bundle event
+ if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
+ logger.debug("Handling local job {}", event);
+ // job event
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+
+ // job topic must be set, otherwise we ignore this event!
+ if ( jobTopic != null ) {
+ // queue the event in order to respond quickly
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ } else {
+ this.logger.warn("Event does not contain job topic: {}", event);
+ }
+
+ } else {
+ // bundle event started or updated
+ boolean doIt = false;
+ synchronized ( this.unloadedJobs ) {
+ if ( this.unloadedJobs.size() > 0 ) {
+ doIt = true;
+ }
+ }
+ if ( doIt ) {
+ final Runnable t = new Runnable() {
+
+ public void run() {
+ synchronized (unloadedJobs) {
+ Session s = null;
+ final Set<String> newUnloadedJobs = new HashSet<String>();
+ newUnloadedJobs.addAll(unloadedJobs);
+ try {
+ s = createSession();
+ for(String path : unloadedJobs ) {
+ newUnloadedJobs.remove(path);
+ try {
+ if ( s.itemExists(path) ) {
+ final Node eventNode = (Node) s.getItem(path);
+ if ( !eventNode.isLocked() ) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = readEvent(eventNode);
+ info.nodePath = path;
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ newUnloadedJobs.add(path);
+ ignoreException(cnfe);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ // we ignore this and readd
+ newUnloadedJobs.add(path);
+ ignoreException(re);
+ }
+ }
+ } catch (RepositoryException re) {
+ // unable to create session, so we try it again next time
+ ignoreException(re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ unloadedJobs.clear();
+ unloadedJobs.addAll(newUnloadedJobs);
+ }
+ }
+ }
+
+ };
+ this.threadPool.execute(t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a unique node path (folder and name) for the job.
+ */
+ private String getNodePath(final String jobTopic, final String jobId) {
+ if ( jobId != null ) {
+ return jobTopic.replace('/', '.') + "/" + EventHelper.filter(jobId);
+ }
+ return jobTopic.replace('/', '.') + "/Job " + UUID.randomUUID().toString();
+ }
+
+ /**
+ * Process a job and unlock the node in the repository.
+ * @param event The original event.
+ * @param eventNode The node in the repository where the job is stored.
+ * @param isMainQueue Is this the main queue?
+ */
+ private void processJob(Event event, Node eventNode, boolean isMainQueue) {
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ boolean unlock = true;
+ try {
+ if ( isMainQueue && !parallelProcessing ) {
+ this.parallelJobCount++;
+ }
+ final String nodePath = eventNode.getPath();
+ final Event jobEvent = this.getJobEvent(event, nodePath);
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
+ eventNode.save();
+ final EventAdmin localEA = this.eventAdmin;
+ if ( localEA != null ) {
+ final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis());
+ // let's add the event to our processing list
+ synchronized ( this.processingEventsList ) {
+ this.processingEventsList.put(nodePath, jobInfo);
+ }
+
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ localEA.postEvent(jobEvent);
+ // do not unlock if sending was successful
+ unlock = false;
+ } else {
+ this.logger.error("Job event can't be sent as no event admin is available.");
+ }
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job processing.", re);
+ } finally {
+ if ( unlock ) {
+ if ( isMainQueue && !parallelProcessing ) {
+ this.parallelJobCount--;
+ }
+ if ( !parallelProcessing ) {
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
+ }
+ // unlock node
+ try {
+ eventNode.unlock();
+ } catch (RepositoryException e) {
+ // if unlock fails, we silently ignore this
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Create the real job event.
+ * This generates a new event object with the same properties, but with the
+ * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+ * @param e The job event.
+ * @return The real job event.
+ */
+ private Event getJobEvent(Event e, String nodePath) {
+ final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final Dictionary<String, Object> properties = new EventPropertiesMap(e);
+ // put properties for finished job callback
+ properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+ new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
+ return new Event(eventTopic, properties);
+ }
+
+ /**
+ * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+ */
+ protected void addNodeProperties(Node eventNode, Event event)
+ throws RepositoryException {
+ super.addNodeProperties(eventNode, event);
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
+ final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+ if ( jobId != null ) {
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_JOBID, jobId);
+ }
+ final long retryCount = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT), 0);
+ final long retries = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRIES), this.maxJobRetries);
+ eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+ eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, retries);
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+ */
+ protected void addEventProperties(Node eventNode,
+ Dictionary<String, Object> properties)
+ throws RepositoryException {
+ super.addEventProperties(eventNode, properties);
+ // convert to integers (jcr only supports long)
+ if ( properties.get(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+ properties.put(EventUtil.PROPERTY_JOB_RETRIES, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRIES).toString()));
+ }
+ if ( properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ properties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT).toString()));
+ }
+ // add application id
+ properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(EventIterator iter) {
+ // we create an own session here
+ Session s = null;
+ try {
+ s = this.createSession();
+ while ( iter.hasNext() ) {
+ final javax.jcr.observation.Event event = iter.nextEvent();
+ if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+ || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+ try {
+ final String propPath = event.getPath();
+ int pos = propPath.lastIndexOf('/');
+ final String nodePath = propPath.substring(0, pos);
+ final String propertyName = propPath.substring(pos+1);
+
+ // we are only interested in unlocks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ boolean doNotProcess = false;
+ synchronized ( this.deletedJobs ) {
+ doNotProcess = this.deletedJobs.remove(nodePath);
+ }
+ if ( !doNotProcess ) {
+ final Node eventNode = (Node) s.getItem(nodePath);
+ if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = this.readEvent(eventNode);
+ info.nodePath = nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // store path for lazy loading
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ }
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during jcr event processing.", re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+
+ /**
+ * Load all active jobs from the repository.
+ * @throws RepositoryException
+ */
+ private void loadJobs() {
+ try {
+ final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(this.getEventNodeType());
+ buffer.append(") order by @");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" ascending");
+ final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+ final NodeIterator result = q.execute().getNodes();
+ while ( result.hasNext() ) {
+ final Node eventNode = result.nextNode();
+ if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ final String nodePath = eventNode.getPath();
+ try {
+ final Event event = this.readEvent(eventNode);
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ info.nodePath = nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // store path for lazy loading
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + nodePath, re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during initial loading of stored jobs.", re);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event, java.lang.String)
+ */
+ public boolean sendAcknowledge(Event job, String eventNodePath) {
+ synchronized ( this.processingEventsList ) {
+ // if the event is still in the processing list, we confirm the ack
+ final Object ack = this.processingEventsList.remove(eventNodePath);
+ return ack != null;
+ }
+
+ }
+
+ /**
+ * This is a notification from the component which processed the job.
+ *
+ * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
+ */
+ public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
+ // let's remove the event from our processing list
+ // this is just a sanity check, as usually the job should have been
+ // removed during sendAcknowledge.
+ synchronized ( this.processingEventsList ) {
+ this.processingEventsList.remove(eventNodePath);
+ }
+
+ boolean reschedule = shouldReschedule;
+ if ( shouldReschedule ) {
+ // check if we exceeded the number of retries
+ int retries = this.maxJobRetries;
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+ retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
+ }
+ int retryCount = 0;
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ retryCount++;
+ if ( retries != -1 && retryCount > retries ) {
+ reschedule = false;
+ }
+ if ( reschedule ) {
+ // update event with retry count and retries
+ final Dictionary<String, Object> newProperties = new EventPropertiesMap(job);
+ newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+ newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
+ job = new Event(job.getTopic(), newProperties);
+ }
+ }
+ final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ EventInfo putback = null;
+ // we have to use the same session for unlocking that we used for locking!
+ synchronized ( this.backgroundLock ) {
+ // we might get here asnyc while this service has already been shutdown!
+ if ( this.backgroundSession == null ) {
+ // we can only return false here
+ return false;
+ }
+ try {
+ this.backgroundSession.refresh(false);
+ // check if the job has been cancelled
+ if ( !this.backgroundSession.itemExists(eventNodePath) ) {
+ return true;
+ }
+ final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
+ boolean unlock = true;
+ try {
+ if ( !reschedule ) {
+ synchronized ( this.deletedJobs ) {
+ this.deletedJobs.add(eventNodePath);
+ }
+ // unlock node
+ try {
+ eventNode.unlock();
+ } catch (RepositoryException e) {
+ // if unlock fails, we silently ignore this
+ this.ignoreException(e);
+ }
+ unlock = false;
+ final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ if ( jobId == null ) {
+ // remove node from repository if no job is set
+ final Node parentNode = eventNode.getParent();
+ eventNode.remove();
+ parentNode.save();
+ } else {
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
+ eventNode.save();
+ }
+ }
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job finishing.", re);
+ } finally {
+ if ( !parallelProcessing) {
+ final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
+ } else {
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
+ this.parallelJobCount--;
+ }
+ }
+ if ( unlock ) {
+ synchronized ( this.deletedJobs ) {
+ this.deletedJobs.add(eventNodePath);
+ }
+ // unlock node
+ try {
+ eventNode.unlock();
+ } catch (RepositoryException e) {
+ // if unlock fails, we silently ignore this
+ this.ignoreException(e);
+ }
+ }
+ }
+ if ( reschedule ) {
+ // update retry count and retries in the repository
+ try {
+ eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
+ eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
+ eventNode.save();
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job updating job rescheduling information.", re);
+ }
+ final EventInfo info = new EventInfo();
+ try {
+ info.event = job;
+ info.nodePath = eventNode.getPath();
+ } catch (RepositoryException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ // if this is an own job queue, we simply signal the queue to continue
+ // it will pick up the event and either reschedule or wait
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ // we know the queue exists
+ final JobBlockingQueue jobQueue;
+ synchronized ( this.jobQueues ) {
+ jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+ }
+ synchronized ( jobQueue.getLock()) {
+ jobQueue.notifyFinish(info);
+ }
+ } else {
+
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ putback = info;
+ } else {
+ // put directly into queue
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+ } else {
+ // if this is an own job queue, we simply signal the queue to continue
+ // it will pick up the event and continue with the next event
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ // we know the queue exists
+ final JobBlockingQueue jobQueue;
+ synchronized ( this.jobQueues ) {
+ jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+ }
+ synchronized ( jobQueue.getLock()) {
+ jobQueue.notifyFinish(null);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create new session.", re);
+ return false;
+ }
+ }
+ if ( putback != null ) {
+ final EventInfo info = putback;
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final Runnable t = new Runnable() {
+ public void run() {
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(null, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ }
+ if ( !shouldReschedule ) {
+ return true;
+ }
+ return reschedule;
+ }
+
+ /**
+ * Search for job nodes
+ * @param topic The job topic
+ * @param filterProps optional filter props
+ * @param locked only active jobs?
+ * @return
+ * @throws RepositoryException
+ */
+ private Collection<Event> queryJobs(final String topic,
+ final Boolean locked,
+ final Map<String, Object>... filterProps) {
+ // we create a new session
+ Session s = null;
+ final List<Event> jobs = new ArrayList<Event>();
+ try {
+ s = this.createSession();
+ final QueryManager qManager = s.getWorkspace().getQueryManager();
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ if ( topic != null ) {
+ buffer.append('/');
+ buffer.append(topic.replace('/', '.'));
+ }
+ buffer.append("//element(*, ");
+ buffer.append(this.getEventNodeType());
+ buffer.append(") [not(@");
+ buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+ buffer.append(")");
+ if ( locked != null ) {
+ if ( locked ) {
+ buffer.append(" and @jcr:lockOwner");
+ } else {
+ buffer.append(" and not(@jcr:lockOwner)");
+ }
+ }
+ if ( filterProps != null && filterProps.length > 0 ) {
+ buffer.append(" and (");
+ int index = 0;
+ for (Map<String,Object> template : filterProps) {
+ if ( index > 0 ) {
+ buffer.append(" or ");
+ }
+ buffer.append('(');
+ final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+ boolean first = true;
+ while ( i.hasNext() ) {
+ final Map.Entry<String, Object> current = i.next();
+ // check prop name first
+ final String propName = EventUtil.getNodePropertyName(current.getKey());
+ if ( propName != null ) {
+ // check value
+ final Value value = EventUtil.getNodePropertyValue(s.getValueFactory(), current.getValue());
+ if ( value != null ) {
+ if ( first ) {
+ first = false;
+ buffer.append('@');
+ } else {
+ buffer.append(" and @");
+ }
+ buffer.append(propName);
+ buffer.append(" = '");
+ buffer.append(current.getValue());
+ buffer.append("'");
+ }
+ }
+ }
+ buffer.append(')');
+ index++;
+ }
+ buffer.append(')');
+ }
+ buffer.append("]");
+ buffer.append(" order by @");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" ascending");
+ final String queryString = buffer.toString();
+ logger.debug("Executing job query {}.", queryString);
+
+ final Query q = qManager.createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ try {
+ final Event event = this.readEvent(eventNode);
+ jobs.add(event);
+ } catch (ClassNotFoundException cnfe) {
+ // in the case of a class not found exception we just ignore the exception
+ this.ignoreException(cnfe);
+ }
+ }
+ } catch (RepositoryException e) {
+ // in the case of an error, we return an empty list
+ this.ignoreException(e);
+ } finally {
+ if ( s != null) {
+ s.logout();
+ }
+ }
+ return jobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
+ */
+ public Collection<Event> getCurrentJobs(String topic) {
+ return this.getCurrentJobs(topic, (Map<String, Object>[])null);
+ }
+
+ /**
+ * This is deprecated.
+ */
+ public Collection<Event> scheduledJobs(String topic) {
+ return this.getScheduledJobs(topic);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String)
+ */
+ public Collection<Event> getScheduledJobs(String topic) {
+ return this.getScheduledJobs(topic, (Map<String, Object>[])null);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String, java.util.Map...)
+ */
+ public Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps) {
+ return this.queryJobs(topic, true, filterProps);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String, java.util.Map...)
+ */
+ public Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps) {
+ return this.queryJobs(topic, false, filterProps);
+ }
+
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getAllJobs(java.lang.String, java.util.Map...)
+ */
+ public Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps) {
+ return this.queryJobs(topic, null, filterProps);
+ }
+
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String, java.lang.String)
+ */
+ public void cancelJob(String topic, String jobId) {
+ if ( jobId != null && topic != null ) {
+ this.cancelJob(this.getNodePath(topic, jobId));
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String)
+ */
+ public void cancelJob(String jobId) {
+ if ( jobId != null ) {
+ synchronized ( this.writeLock ) {
+ try {
+ this.writerSession.refresh(false);
+ } catch (RepositoryException e) {
+ this.ignoreException(e);
+ }
+ try {
+ if ( this.writerSession.itemExists(jobId) ) {
+ final Item item = this.writerSession.getItem(jobId);
+ final Node parentNode = item.getParent();
+ item.remove();
+ parentNode.save();
+ }
+ } catch (RepositoryException e) {
+ this.logger.error("Error during cancelling job at " + jobId, e);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+ */
+ public void wakeUpJobQueue(String jobQueueName) {
+ if ( jobQueueName != null ) {
+ synchronized ( this.jobQueues ) {
+ final JobBlockingQueue queue = this.jobQueues.get(jobQueueName);
+ if ( queue != null && queue.isSleeping() ) {
+ final String schedulerJobName = queue.getSchedulerJobName();
+ final Thread thread = queue.getSleepingThread();
+ if ( schedulerJobName != null ) {
+ this.scheduler.removeJob(schedulerJobName);
+ }
+ if ( thread != null ) {
+ thread.interrupt();
+ }
+ }
+ }
+ }
+ }
+
+
+ private static final class StartedJobInfo {
+ public final Event event;
+ public final String nodePath;
+ public final long started;
+
+ public StartedJobInfo(final Event e, final String path, final long started) {
+ this.event = e;
+ this.nodePath = path;
+ this.started = started;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java b/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
new file mode 100644
index 0000000..fd54071
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import javax.jcr.Item;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.lock.Lock;
+import javax.jcr.lock.LockException;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.TimedEventStatusProvider;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for timed events.
+ *
+ * @scr.component metatype="no"
+ * @scr.service interface="TimedEventStatusProvider"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_TIMED_EVENT"
+ * values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ * values.started="org/osgi/framework/BundleEvent/STARTED"
+ * @scr.property name="repository.path" value="/var/eventing/timed-jobs"
+ */
+public class TimedJobHandler
+ extends AbstractRepositoryEventHandler
+ implements Job, TimedEventStatusProvider {
+
+ protected static final String JOB_TOPIC = "topic";
+
+ protected static final String JOB_CONFIG = "config";
+
+ protected static final String JOB_SCHEDULE_INFO = "info";
+
+ /** @scr.reference */
+ protected Scheduler scheduler;
+
+ /** Unloaded events. */
+ protected Set<String>unloadedEvents = new HashSet<String>();
+
+ /** Sync lock */
+ private final Object writeLock = new Object();
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+ */
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ // load timed events from repository
+ this.loadEvents();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true);
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( this.running && event != null ) {
+ ScheduleInfo scheduleInfo = null;
+ try {
+ scheduleInfo = new ScheduleInfo(event);
+ } catch (IllegalArgumentException iae) {
+ this.logger.error(iae.getMessage());
+ }
+ if ( scheduleInfo != null ) {
+ final EventInfo info = new EventInfo();
+ info.event = event;
+
+ // write event and update path
+ // if something went wrong we get the node path and reschedule
+ synchronized ( this.writeLock ) {
+ info.nodePath = this.persistEvent(info.event, scheduleInfo);
+ }
+ if ( info.nodePath != null ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
+ protected void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next info from the queue
+ EventInfo info = null;
+ try {
+ info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( info != null && this.running ) {
+ synchronized ( this.writeLock ) {
+ ScheduleInfo scheduleInfo = null;
+ try {
+ scheduleInfo = new ScheduleInfo(info.event);
+ } catch (IllegalArgumentException iae) {
+ this.logger.error(iae.getMessage());
+ }
+ if ( scheduleInfo != null ) {
+ try {
+ this.writerSession.refresh(true);
+ if ( this.writerSession.itemExists(info.nodePath) ) {
+ final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ Lock lock = null;
+ try {
+ lock = eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ }
+ if ( lock != null ) {
+ // if something went wrong, we reschedule
+ if ( !this.processEvent(info.event, scheduleInfo) ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
+ try {
+ // get parent node
+ final Node parentNode = this.ensureRepositoryPath();
+ final String nodeName = scheduleInfo.jobId;
+ // is there already a node?
+ final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ Lock lock = null;
+ if ( scheduleInfo.isStopEvent() ) {
+ // if this is a stop event, we should remove the node from the repository
+ // if there is no node someone else was faster and we can ignore this
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ }
+ // stop the scheduler
+ processEvent(event, scheduleInfo);
+ } else {
+ // if there is already a node, it means we must handle an update
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ // create a stop event
+ processEvent(event, scheduleInfo.getStopInfo());
+ }
+ // we only write the event if this is a local one
+ if ( EventUtil.isLocal(event) ) {
+
+ // write event to repository, lock it and schedule the event
+ final Node eventNode = writeEvent(event, nodeName);
+ lock = eventNode.lock(false, true);
+ }
+ }
+
+ if ( lock != null ) {
+ // if something went wrong, we reschedule
+ if ( !this.processEvent(event, scheduleInfo) ) {
+ final String path = lock.getNode().getPath();
+ lock.getNode().unlock();
+ return path;
+ }
+ }
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job to repository.", re);
+ }
+ return null;
+ }
+
+ /**
+ * Process the event.
+ * If a scheduler is available, a job is scheduled or stopped.
+ * @param event The incomming event.
+ * @return
+ */
+ protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) {
+ final Scheduler localScheduler = this.scheduler;
+ if ( localScheduler != null ) {
+ // is this a stop event?
+ if ( scheduleInfo.isStopEvent() ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC) + "(" + scheduleInfo.jobId + ")");
+ }
+ try {
+ localScheduler.removeJob(scheduleInfo.jobId);
+ } catch (NoSuchElementException nsee) {
+ // this can happen if the job is scheduled on another node
+ // so we can just ignore this
+ }
+ return true;
+ }
+ // we ignore remote job events
+ if ( !EventUtil.isLocal(event) ) {
+ return true;
+ }
+
+ // Create configuration for scheduled job
+ final Map<String, Serializable> config = new HashMap<String, Serializable>();
+ // copy properties
+ final Hashtable<String, Object> properties = new Hashtable<String, Object>();
+ config.put(JOB_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+ final String[] names = event.getPropertyNames();
+ if ( names != null ) {
+ for(int i=0; i<names.length; i++) {
+ properties.put(names[i], event.getProperty(names[i]));
+ }
+ }
+ config.put(JOB_CONFIG, properties);
+ config.put(JOB_SCHEDULE_INFO, scheduleInfo);
+
+ try {
+ if ( scheduleInfo.expression != null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with cron expression " + scheduleInfo.expression);
+ }
+ localScheduler.addJob(scheduleInfo.jobId, this, config, scheduleInfo.expression, false);
+ } else if ( scheduleInfo.period != null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with period " + scheduleInfo.period);
+ }
+ localScheduler.addPeriodicJob(scheduleInfo.jobId, this, config, scheduleInfo.period, false);
+ } else {
+ // then it must be date
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with date " + scheduleInfo.date);
+ }
+ localScheduler.fireJobAt(scheduleInfo.jobId, this, config, scheduleInfo.date);
+ }
+ return true;
+ } catch (Exception e) {
+ this.ignoreException(e);
+ }
+ } else {
+ this.logger.error("No scheduler available to start timed event " + event);
+ }
+ return false;
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(EventIterator iter) {
+ // we create an own session here
+ Session s = null;
+ try {
+ s = this.createSession();
+ while ( iter.hasNext() ) {
+ final javax.jcr.observation.Event event = iter.nextEvent();
+ if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+ || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+
+ final String propPath = event.getPath();
+ int pos = propPath.lastIndexOf('/');
+ final String nodePath = propPath.substring(0, pos);
+ final String propertyName = propPath.substring(pos+1);
+ // we are only interested in unlocks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ try {
+ final Node eventNode = (Node) s.getItem(nodePath);
+ if ( !eventNode.isLocked() ) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = this.readEvent(eventNode);
+ info.nodePath =nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // add it to the unloaded set
+ synchronized (unloadedEvents) {
+ this.unloadedEvents.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during jcr event processing.", re);
+ }
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(Event event) {
+ if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
+ // queue the event in order to respond quickly
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ } else {
+ // bundle event started or updated
+ boolean doIt = false;
+ synchronized ( this.unloadedEvents ) {
+ if ( this.unloadedEvents.size() > 0 ) {
+ doIt = true;
+ }
+ }
+ if ( doIt ) {
+ final Runnable t = new Runnable() {
+
+ public void run() {
+ synchronized (unloadedEvents) {
+ Session s = null;
+ final Set<String> newUnloadedEvents = new HashSet<String>();
+ newUnloadedEvents.addAll(unloadedEvents);
+ try {
+ s = createSession();
+ for(String path : unloadedEvents ) {
+ newUnloadedEvents.remove(path);
+ try {
+ if ( s.itemExists(path) ) {
+ final Node eventNode = (Node) s.getItem(path);
+ if ( !eventNode.isLocked() ) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = readEvent(eventNode);
+ info.nodePath = path;
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ newUnloadedEvents.add(path);
+ ignoreException(cnfe);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ // we ignore this and readd
+ newUnloadedEvents.add(path);
+ ignoreException(re);
+ }
+ }
+ } catch (RepositoryException re) {
+ // unable to create session, so we try it again next time
+ ignoreException(re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ unloadedEvents.clear();
+ unloadedEvents.addAll(newUnloadedEvents);
+ }
+ }
+ }
+
+ };
+ this.threadPool.execute(t);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+ */
+ public void execute(JobContext context) {
+ final String topic = (String) context.getConfiguration().get(JOB_TOPIC);
+ @SuppressWarnings("unchecked")
+ final Dictionary<Object, Object> properties = (Dictionary<Object, Object>) context.getConfiguration().get(JOB_CONFIG);
+ final EventAdmin ea = this.eventAdmin;
+ if ( ea != null ) {
+ try {
+ ea.postEvent(new Event(topic, properties));
+ } catch (IllegalArgumentException iae) {
+ this.logger.error("Scheduled event has illegal topic: " + topic, iae);
+ }
+ } else {
+ this.logger.warn("Unable to send timed event as no event admin service is available.");
+ }
+ final ScheduleInfo info = (ScheduleInfo) context.getConfiguration().get(JOB_SCHEDULE_INFO);
+ // is this job scheduled for a specific date?
+ if ( info.date != null ) {
+ // we can remove it from the repository
+ // we create an own session here
+ Session s = null;
+ try {
+ s = this.createSession();
+ if ( s.itemExists(this.repositoryPath) ) {
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ final String nodeName = info.jobId;
+ final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ if ( eventNode != null ) {
+ try {
+ eventNode.remove();
+ parentNode.save();
+ } catch (RepositoryException re) {
+ // we ignore the exception if removing fails
+ ignoreException(re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ }
+
+ /**
+ * Load all active timed events from the repository.
+ * @throws RepositoryException
+ */
+ protected void loadEvents() {
+ try {
+ final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(this.getEventNodeType());
+ buffer.append(")");
+ final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+ final NodeIterator result = q.execute().getNodes();
+ while ( result.hasNext() ) {
+ final Node eventNode = result.nextNode();
+ if ( !eventNode.isLocked() ) {
+ final String nodePath = eventNode.getPath();
+ try {
+ final Event event = this.readEvent(eventNode);
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ info.nodePath = nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // add it to the unloaded set
+ synchronized (unloadedEvents) {
+ this.unloadedEvents.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ // if reading an event fails, we ignore this
+ this.ignoreException(re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during initial loading of stored timed events.", re);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+ */
+ protected void addNodeProperties(Node eventNode, Event event)
+ throws RepositoryException {
+ super.addNodeProperties(eventNode, event);
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+ final ScheduleInfo info = new ScheduleInfo(event);
+ if ( info.date != null ) {
+ final Calendar c = Calendar.getInstance();
+ c.setTime(info.date);
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_DATE, c);
+ }
+ if ( info.expression != null ) {
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_EXPRESSION, info.expression);
+ }
+ if ( info.period != null ) {
+ eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue());
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+ */
+ protected String getEventNodeType() {
+ return EventHelper.TIMED_EVENT_NODE_TYPE;
+ }
+
+ protected static final class ScheduleInfo implements Serializable {
+
+ public final String expression;
+ public final Long period;
+ public final Date date;
+ public final String jobId;
+
+ public ScheduleInfo(final Event event)
+ throws IllegalArgumentException {
+ // let's see if a schedule information is specified or if the job should be stopped
+ this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
+ this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
+ this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+ int count = 0;
+ if ( this.expression != null) {
+ count++;
+ }
+ if ( this.period != null ) {
+ count++;
+ }
+ if ( this.date != null ) {
+ count++;
+ }
+ if ( count > 1 ) {
+ throw new IllegalArgumentException("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE +
+ ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
+ ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should be used.");
+ }
+ // we create a job id consisting of the real event topic and an (optional) id
+ // if the event contains a timed event id or a job id we'll append that to the name
+ String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+ if ( topic == null ) {
+ throw new IllegalArgumentException("Timed event does not contain required property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+ }
+ String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
+ String jId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+
+ //this.jobId = getJobId(topic, id, jId);
+ this.jobId = getJobId(topic, id, jId);
+ }
+
+ private ScheduleInfo(String jobId) {
+ this.expression = null;
+ this.period = null;
+ this.date = null;
+ this.jobId = jobId;
+ }
+
+ public ScheduleInfo getStopInfo() {
+ return new ScheduleInfo(this.jobId);
+ }
+
+ public boolean isStopEvent() {
+ return this.expression == null && this.period == null && this.date == null;
+ }
+
+ public static String getJobId(String topic, String timedEventId, String jobId) {
+ return topic.replace('/', '.') + "/TimedEvent " + (timedEventId != null ? EventHelper.filter(timedEventId) : "") + '_' + (jobId != null ? EventHelper.filter(jobId) : "");
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvent(java.lang.String, java.lang.String, java.lang.String)
+ */
+ public Event getScheduledEvent(String topic, String eventId, String jobId) {
+ Session s = null;
+ try {
+ s = this.createSession();
+ if ( s.itemExists(this.repositoryPath) ) {
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ final String nodeName = ScheduleInfo.getJobId(topic, eventId, jobId);
+ final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ if ( eventNode != null ) {
+ return this.readEvent(eventNode);
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } catch (ClassNotFoundException e) {
+ this.ignoreException(e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvents(java.lang.String, java.util.Map...)
+ */
+ public Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps) {
+ // we create a new session
+ Session s = null;
+ final List<Event> jobs = new ArrayList<Event>();
+ try {
+ s = this.createSession();
+ final QueryManager qManager = s.getWorkspace().getQueryManager();
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ if ( topic != null ) {
+ buffer.append('/');
+ buffer.append(topic.replace('/', '.'));
+ }
+ buffer.append("//element(*, ");
+ buffer.append(this.getEventNodeType());
+ buffer.append(")");
+ if ( filterProps != null && filterProps.length > 0 ) {
+ buffer.append(" [");
+ int index = 0;
+ for (Map<String,Object> template : filterProps) {
+ if ( index > 0 ) {
+ buffer.append(" or ");
+ }
+ buffer.append('(');
+ final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+ boolean first = true;
+ while ( i.hasNext() ) {
+ final Map.Entry<String, Object> current = i.next();
+ // check prop name first
+ final String propName = EventUtil.getNodePropertyName(current.getKey());
+ if ( propName != null ) {
+ // check value
+ final Value value = EventUtil.getNodePropertyValue(s.getValueFactory(), current.getValue());
+ if ( value != null ) {
+ if ( first ) {
+ first = false;
+ buffer.append('@');
+ } else {
+ buffer.append(" and @");
+ }
+ buffer.append(propName);
+ buffer.append(" = '");
+ buffer.append(current.getValue());
+ buffer.append("'");
+ }
+ }
+ }
+ buffer.append(')');
+ index++;
+ }
+ buffer.append(']');
+ }
+ final String queryString = buffer.toString();
+ logger.debug("Executing job query {}.", queryString);
+
+ final Query q = qManager.createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ try {
+ final Event event = this.readEvent(eventNode);
+ jobs.add(event);
+ } catch (ClassNotFoundException cnfe) {
+ // in the case of a class not found exception we just ignore the exception
+ this.ignoreException(cnfe);
+ }
+ }
+ } catch (RepositoryException e) {
+ // in the case of an error, we return an empty list
+ this.ignoreException(e);
+ } finally {
+ if ( s != null) {
+ s.logout();
+ }
+ }
+ return jobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#cancelTimedEvent(java.lang.String)
+ */
+ public void cancelTimedEvent(String jobId) {
+ synchronized ( this.writeLock ) {
+ try {
+ // is there a node?
+ final Item foundNode = this.writerSession.itemExists(jobId) ? this.writerSession.getItem(jobId) : null;
+ // we should remove the node from the repository
+ // if there is no node someone else was faster and we can ignore this
+ if ( foundNode != null ) {
+ final Node parentNode = foundNode.getParent();
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ }
+ } catch ( RepositoryException re) {
+ this.logger.error("Unable to cancel timed event: " + jobId, re);
+ }
+ // stop the scheduler
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Stopping timed event " + jobId);
+ }
+ final Scheduler localScheduler = this.scheduler;
+ if ( localScheduler != null ) {
+ try {
+ localScheduler.removeJob(jobId);
+ } catch (NoSuchElementException nsee) {
+ // this can happen if the job is scheduled on another node
+ // so we can just ignore this
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/resources/META-INF/DISCLAIMER b/src/main/resources/META-INF/DISCLAIMER
new file mode 100644
index 0000000..90850c2
--- /dev/null
+++ b/src/main/resources/META-INF/DISCLAIMER
@@ -0,0 +1,7 @@
+Apache Sling is an effort undergoing incubation at The Apache Software Foundation (ASF),
+sponsored by the Apache Jackrabbit PMC. Incubation is required of all newly accepted
+projects until a further review indicates that the infrastructure, communications,
+and decision making process have stabilized in a manner consistent with other
+successful ASF projects. While incubation status is not necessarily a reflection of
+the completeness or stability of the code, it does indicate that the project has yet
+to be fully endorsed by the ASF.
\ No newline at end of file
diff --git a/src/main/resources/META-INF/LICENSE b/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/src/main/resources/META-INF/NOTICE b/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..32a792a
--- /dev/null
+++ b/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,8 @@
+Apache Sling Event
+Copyright 2008-2009 The Apache Software Foundation
+
+Apache Sling is based on source code originally developed
+by Day Software (http://www.day.com/).
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/src/main/resources/OSGI-INF/metatype/metatype.properties b/src/main/resources/OSGI-INF/metatype/metatype.properties
new file mode 100644
index 0000000..d3a14ad
--- /dev/null
+++ b/src/main/resources/OSGI-INF/metatype/metatype.properties
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# This file contains localization strings for configuration labels and
+# descriptions as used in the metatype.xml descriptor generated by the
+# the SCR plugin
+
+#
+# Distributing Event Handler
+dist.events.name = Apache Sling Distributing Event Handler
+dist.events.description = Distributes local OSGi Event Admin events to \
+ other nodes of the same cluster. The events are written to the JCR \
+ repository for distribution to other nodes while events written to the \
+ repository are picked up and distributed locally through the OSGi Event Admin \
+ Service.
+
+#
+# Job Event Handler
+job.events.name = Apache Sling Job Event Handler
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the repository and distributed \
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
+
+sleep.time.name = Retry Interval
+sleep.time.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+max.job.retries.name = Maximum Retries
+max.job.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+jobscheduler.period.name = Event Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to \
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+wait.for.ack.name = Acknowledge Waiting Time
+wait.for.ack.description = If a service is processing a job, it acknowledges this \
+ by sending a message to the Job Event Handler. If the Job Event Handler does not \
+ receive such a message in the configured time, it reschedules the job. The configured \
+ time is in seconds (default is 90 secs).
+
+max.parallel.jobs.name = Maximum Parallel Jobs
+max.parallel.jobs.description = The maximum number of parallel jobs started for the main \
+ queue.
+
+
+#
+# Event Pool
+event.pool.name = Apache Sling Event Thread Pool
+event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+
+minPoolSize.name = Min Pool Size
+minPoolSize.description = The minimum pool size. The minimum pool size should be \
+ higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
+ allows to process 10 events in parallel.
+
+maxPoolSize.name = Max Pool Size
+maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
+ the minimum pool size.
+
+queueSize.name = Queue Size
+queueSize.description = The maximum size of the thread queue if the pool is exhausted.
+
+#
+# Shared labels
+scheduler.period.name = Event Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+repository.path.name = Persistent Event Location
+repository.path.description = Absolute Path of the Repository location where \
+ events are persisted to be picked up by the event distribution mechanism. \
+ The default value is "/sling/events".
diff --git a/src/main/resources/SLING-INF/nodetypes/event.cnd b/src/main/resources/SLING-INF/nodetypes/event.cnd
new file mode 100644
index 0000000..52ec213
--- /dev/null
+++ b/src/main/resources/SLING-INF/nodetypes/event.cnd
@@ -0,0 +1,42 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+<slingevent='http://sling.apache.org/jcr/event/1.0'>
+<nt='http://www.jcp.org/jcr/nt/1.0'>
+<mix='http://www.jcp.org/jcr/mix/1.0'>
+
+[slingevent:Event] > nt:unstructured, nt:hierarchyNode
+ - slingevent:topic (string)
+ - slingevent:application (string)
+ - slingevent:created (date)
+ - slingevent:properties (binary)
+
+[slingevent:Job] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:finished (date)
+
+[slingevent:TimedEvent] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:expression (string)
+ - slingevent:date (date)
+ - slingevent:period (long)
+
+
diff --git a/src/test/java/org/apache/sling/event/EventUtilTest.java b/src/test/java/org/apache/sling/event/EventUtilTest.java
new file mode 100644
index 0000000..b55e229
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/EventUtilTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.InputStream;
+import java.util.Calendar;
+import java.util.Properties;
+
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+import javax.jcr.ValueFormatException;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+/**
+ * Tests for the EventUtil utility methods.
+ */
+@RunWith(JMock.class)
+public class EventUtilTest {
+
+ protected Mockery context;
+
+ public EventUtilTest() {
+ this.context = new JUnit4Mockery();
+ }
+
+ @Test public void testDistributeFlag() {
+ final Event distributableEvent = EventUtil.createDistributableEvent("some/topic", null);
+ assertTrue(EventUtil.shouldDistribute(distributableEvent));
+ final Event nonDistributableEvent = new Event("another/topic", null);
+ assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+ }
+
+ @Test public void testLocalFlag() {
+ final Event localEvent = new Event("local/event", null);
+ assertTrue(EventUtil.isLocal(localEvent));
+ final Properties props = new Properties();
+ props.put(EventUtil.PROPERTY_APPLICATION, "application1");
+ final Event remoteEvent = new Event("remote/event", props);
+ assertFalse(EventUtil.isLocal(remoteEvent));
+ }
+
+ @Test public void testGetNodePropertyValue() {
+ final ValueFactory factory = this.context.mock(ValueFactory.class);
+ this.context.checking(new Expectations() {{
+ allowing(factory).createValue(true);
+ will(returnValue(new ValueImpl(PropertyType.BOOLEAN)));
+ allowing(factory).createValue(false);
+ will(returnValue(new ValueImpl(PropertyType.BOOLEAN)));
+ allowing(factory).createValue(with(any(Long.class)));
+ will(returnValue(new ValueImpl(PropertyType.LONG)));
+ allowing(factory).createValue(with(any(String.class)));
+ will(returnValue(new ValueImpl(PropertyType.STRING)));
+ allowing(factory).createValue(with(any(Calendar.class)));
+ will(returnValue(new ValueImpl(PropertyType.DATE)));
+ }});
+ // boolean
+ assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, true).getType());
+ assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, false).getType());
+ assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, Boolean.TRUE).getType());
+ assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, Boolean.FALSE).getType());
+ // long
+ assertEquals(PropertyType.LONG, EventUtil.getNodePropertyValue(factory, (long)5).getType());
+ // int = not possible
+ assertEquals(null, EventUtil.getNodePropertyValue(factory, 5));
+ // string
+ assertEquals(PropertyType.STRING, EventUtil.getNodePropertyValue(factory, "something").getType());
+ // calendar
+ assertEquals(PropertyType.DATE, EventUtil.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+ }
+
+ private final static class ValueImpl implements Value {
+
+ private final int type;
+
+ public ValueImpl(int type) {
+ this.type = type;
+ }
+
+ public boolean getBoolean() throws ValueFormatException,
+ IllegalStateException, RepositoryException {
+ return false;
+ }
+
+ public Calendar getDate() throws ValueFormatException,
+ IllegalStateException, RepositoryException {
+ return null;
+ }
+
+ public double getDouble() throws ValueFormatException,
+ IllegalStateException, RepositoryException {
+ return 0;
+ }
+
+ public long getLong() throws ValueFormatException,
+ IllegalStateException, RepositoryException {
+ return 0;
+ }
+
+ public InputStream getStream() throws IllegalStateException,
+ RepositoryException {
+ return null;
+ }
+
+ public String getString() throws ValueFormatException,
+ IllegalStateException, RepositoryException {
+ return null;
+ }
+
+ public int getType() {
+ return this.type;
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java b/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
new file mode 100644
index 0000000..fb0d801
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListenerIterator;
+
+import org.apache.sling.commons.testing.jcr.RepositoryUtil;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.engine.SlingSettingsService;
+import org.apache.sling.event.ThreadPool;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractRepositoryEventHandlerTest {
+
+ protected AbstractRepositoryEventHandler handler;
+
+ protected static final String REPO_PATH = "/test/events";
+ protected static final String SLING_ID = "4711";
+
+ protected static Session session;
+
+ protected abstract Mockery getMockery();
+
+ protected Dictionary<String, Object> getComponentConfig() {
+ final Dictionary<String, Object> config = new Hashtable<String, Object>();
+ config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
+
+ return config;
+ }
+
+ @org.junit.BeforeClass public static void setupRepository() throws Exception {
+ RepositoryUtil.startRepository();
+ final SlingRepository repository = RepositoryUtil.getRepository();
+ session = repository.loginAdministrative(repository.getDefaultWorkspace());
+ assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+ assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+ }
+
+ @org.junit.AfterClass public static void shutdownRepository() throws Exception {
+ RepositoryUtil.stopRepository();
+ }
+
+ @org.junit.Before public void setup() throws Exception {
+ this.handler.repository = RepositoryUtil.getRepository();
+
+ // the event admin
+ final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
+ this.handler.eventAdmin = eventAdmin;
+ this.getMockery().checking(new Expectations() {{
+ allowing(eventAdmin).postEvent(with(any(Event.class)));
+ allowing(eventAdmin).sendEvent(with(any(Event.class)));
+ }});
+
+ // sling settings service
+ this.handler.settingsService = new SlingSettingsService() {
+ public String getSlingId() {
+ return SLING_ID;
+ }
+ };
+
+ // we need a thread pool
+ this.handler.threadPool = new ThreadPoolImpl();
+
+ // lets set up the bundle context
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
+
+ // lets set up the component configuration
+ final Dictionary<String, Object> componentConfig = this.getComponentConfig();
+
+ // lets set up the compnent context
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class);
+ this.getMockery().checking(new Expectations() {{
+ allowing(componentContext).getBundleContext();
+ will(returnValue(bundleContext));
+ allowing(componentContext).getProperties();
+ will(returnValue(componentConfig));
+ }});
+
+ this.handler.activate(componentContext);
+ // the session is initialized in the background, so let's sleep some seconds
+ Thread.sleep(2 * 1000);
+ }
+
+ @org.junit.After public void shutdown() throws Exception {
+ // delete all child nodes to get a clean repository again
+ final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+ final NodeIterator iter = rootNode.getNodes();
+ while ( iter.hasNext() ) {
+ final Node child = iter.nextNode();
+ child.remove();
+ }
+ rootNode.save();
+ // lets set up the bundle context with the sling id
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
+
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class);
+ this.getMockery().checking(new Expectations() {{
+ allowing(componentContext).getBundleContext();
+ will(returnValue(bundleContext));
+ }});
+ this.handler.deactivate(componentContext);
+ }
+
+ @org.junit.Test public void testSetup() throws RepositoryException {
+ assertEquals(this.handler.applicationId, SLING_ID);
+ assertEquals(this.handler.repositoryPath, REPO_PATH);
+ assertNotNull(this.handler.writerSession);
+ final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+ boolean found = false;
+ while ( !found && iter.hasNext() ) {
+ final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+ found = (listener == this.handler);
+ }
+ assertTrue("Handler is not registered as event listener.", found);
+ }
+
+ @org.junit.Test public void testPathCreation() throws RepositoryException {
+ assertTrue(session.itemExists(REPO_PATH));
+ }
+
+ final class ThreadPoolImpl implements ThreadPool {
+
+ public void execute(Runnable runnable) {
+ final Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ public String getName() {
+ return EventHelper.THREAD_POOL_NAME;
+ }
+
+ public void shutdown() {
+ // nothing to do
+ }
+
+ public ThreadPoolConfig getConfiguration() {
+ return new ThreadPoolConfig();
+ }
+
+ }
+}
diff --git a/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java b/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
new file mode 100644
index 0000000..7ad96d4
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+@RunWith(JMock.class)
+public class DistributingEventHandlerTest extends AbstractRepositoryEventHandlerTest {
+
+ protected Mockery context;
+
+ public DistributingEventHandlerTest() {
+ this.handler = new DistributingEventHandler();
+ this.context = new JUnit4Mockery();
+ }
+
+ @Override
+ protected Mockery getMockery() {
+ return this.context;
+ }
+
+ @org.junit.Test public void testWriteEvent() throws Exception {
+ final String topic = "write/event/test";
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a property", "some value");
+ final Event e = new Event(topic, props);
+ this.handler.writeEvent(e, null);
+
+ final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+ final NodeIterator iter = rootNode.getNodes();
+ iter.hasNext();
+ final Node eventNode = iter.nextNode();
+ assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
+ assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+ assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+ // as a starting point we just check if the properties property exists
+ assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
+ }
+
+ @org.junit.Test public void testWriteEventPlusAppId() throws Exception {
+ final String topic = "write/event/test";
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a property", "some value");
+ // now we check if the application id is handled correctly
+ props.put(EventUtil.PROPERTY_APPLICATION, "foo");
+ this.handler.writeEvent(new Event(topic, props), null);
+ final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+ final NodeIterator iter = rootNode.getNodes();
+ iter.hasNext();
+ final Node eventNode = iter.nextNode();
+ assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
+ assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+ assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+ // as a starting point we just check if the properties property exists
+ assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
+ }
+}