Move Sling to new TLP location

git-svn-id: https://svn.eu.apache.org/repos/asf/sling/tags/org.apache.sling.event-2.0.4-incubator@785979 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..32a792a
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,8 @@
+Apache Sling Event
+Copyright 2008-2009 The Apache Software Foundation
+
+Apache Sling is based on source code originally developed 
+by Day Software (http://www.day.com/).
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..3c5e9f8
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,39 @@
+Apache Sling Event
+
+This bundle provides additional features for event handling, like replication
+events in a cluster, providing the support for timed events and jobs.
+
+
+Disclaimer
+==========
+Apache Sling is an effort undergoing incubation at The Apache Software Foundation (ASF),
+sponsored by the Apache Jackrabbit PMC. Incubation is required of all newly accepted
+projects until a further review indicates that the infrastructure, communications,
+and decision making process have stabilized in a manner consistent with other
+successful ASF projects. While incubation status is not necessarily a reflection of
+the completeness or stability of the code, it does indicate that the project has yet
+to be fully endorsed by the ASF.
+
+Getting Started
+===============
+
+This component uses a Maven 2 (http://maven.apache.org/) build
+environment. It requires a Java 5 JDK (or higher) and Maven (http://maven.apache.org/)
+2.0.7 or later. We recommend to use the latest Maven version.
+
+If you have Maven 2 installed, you can compile and
+package the jar using the following command:
+
+    mvn package
+
+See the Maven 2 documentation for other build features.
+
+The latest source code for this component is available in the
+Subversion (http://subversion.tigris.org/) source repository of
+the Apache Software Foundation. If you have Subversion installed,
+you can checkout the latest source using the following command:
+
+    svn checkout http://svn.apache.org/repos/asf/incubator/sling/trunk/extensions/event
+
+See the Subversion documentation for other source control features.
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..16c5ea6
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>5-incubator</version>
+        <relativePath>../../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>org.apache.sling.event</artifactId>
+    <packaging>bundle</packaging>
+    <version>2.0.4-incubator</version>
+
+    <name>Apache Sling Event Support</name>
+    <description>
+        Support for eventing.
+    </description>
+
+    <scm>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</developerConnection>
+        <url>http://svn.apache.org/viewvc/incubator/sling/tags/org.apache.sling.event-2.0.4-incubator</url>
+    </scm>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Export-Package>
+                            org.apache.sling.event;version=${pom.version}
+                        </Export-Package>
+                        <Private-Package>
+                            org.apache.sling.event.impl,
+                            org.apache.jackrabbit.util
+                        </Private-Package>
+                        <!-- We need the dynamic import as events can contain any classes -->
+                        <DynamicImport-Package>*</DynamicImport-Package>
+                        <Sling-Nodetypes>
+                            SLING-INF/nodetypes/event.cnd
+                        </Sling-Nodetypes>
+                        <Sling-Namespaces>
+                            slingevent=http://sling.apache.org/jcr/event/1.0
+                        </Sling-Namespaces>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>
+                        org.apache.sling.event.impl
+                    </excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.osgi.core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.jcr</groupId>
+            <artifactId>jcr</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.0.2-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.engine</artifactId>
+            <version>2.0.2-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.jcr.api</artifactId>
+            <version>2.0.2-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.jcr.resource</artifactId>
+            <version>2.0.4-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.scheduler</artifactId>
+            <version>2.0.2-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.threads</artifactId>
+            <version>2.0.2-incubator</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-jcr-commons</artifactId>
+            <version>1.4.2</version>
+            <scope>compile</scope>
+        </dependency>
+        <!-- Testing -->
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.testing</artifactId>
+            <version>2.0.4-incubator</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/event/EventPropertiesMap.java b/src/main/java/org/apache/sling/event/EventPropertiesMap.java
new file mode 100644
index 0000000..c69dc4b
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/EventPropertiesMap.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.osgi.service.event.Event;
+
+/**
+ * An implementation of a map that helps in dealing with properties
+ * of an OSGi event.
+ * This map implements both, the map and the dictionary interfaces.
+ */
+public class EventPropertiesMap
+    extends Dictionary<String, Object>
+    implements Map<String, Object>, Serializable {
+
+    private final Map<String, Object> delegatee;
+
+    /**
+     * Construct a new map out of an event object.
+     * The resulting map is modifiable. But any modification has
+     * no influence on the original properties of the event!
+     * @param event The event object.
+     */
+    public EventPropertiesMap(final Event event) {
+        // create a map out of the event properties
+        final Map<String, Object> props = new HashMap<String, Object>();
+        if ( event.getPropertyNames() != null ) {
+            for(final String key : event.getPropertyNames() ) {
+                props.put(key, event.getProperty(key));
+            }
+        }
+        this.delegatee = props;
+    }
+
+    /**
+     * Construct a new map out of another map.
+     * @param props The properties map object.
+     */
+    public EventPropertiesMap(final Map<String, Object> props) {
+        this.delegatee = props;
+    }
+
+    /**
+     * Construct a new map.
+     */
+    public EventPropertiesMap() {
+        this.delegatee = new HashMap<String, Object>();
+    }
+
+    /**
+     * @see java.util.Map#clear()
+     */
+    public void clear() {
+        delegatee.clear();
+    }
+
+    /**
+     * @see java.util.Map#containsKey(java.lang.Object)
+     */
+    public boolean containsKey(Object key) {
+        return delegatee.containsKey(key);
+    }
+
+    /**
+     * @see java.util.Map#containsValue(java.lang.Object)
+     */
+    public boolean containsValue(Object value) {
+        return delegatee.containsValue(value);
+    }
+
+    /**
+     * @see java.util.Map#entrySet()
+     */
+    public Set<java.util.Map.Entry<String, Object>> entrySet() {
+        return delegatee.entrySet();
+    }
+
+    /**
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    public boolean equals(Object o) {
+        return delegatee.equals(o);
+    }
+
+    /**
+     * @see java.util.Dictionary#get(java.lang.Object)
+     */
+    public Object get(Object key) {
+        return delegatee.get(key);
+    }
+
+    /**
+     * @see java.lang.Object#hashCode()
+     */
+    public int hashCode() {
+        return delegatee.hashCode();
+    }
+
+    /**
+     * @see java.util.Dictionary#isEmpty()
+     */
+    public boolean isEmpty() {
+        return delegatee.isEmpty();
+    }
+
+    /**
+     * @see java.util.Map#keySet()
+     */
+    public Set<String> keySet() {
+        return delegatee.keySet();
+    }
+
+    /**
+     * @see java.util.Dictionary#put(java.lang.Object, java.lang.Object)
+     */
+    public Object put(String key, Object value) {
+        return delegatee.put(key, value);
+    }
+
+    /**
+     * @see java.util.Map#putAll(java.util.Map)
+     */
+    public void putAll(Map<? extends String, ? extends Object> t) {
+        delegatee.putAll(t);
+    }
+
+    /**
+     * @see java.util.Dictionary#remove(java.lang.Object)
+     */
+    public Object remove(Object key) {
+        return delegatee.remove(key);
+    }
+
+    /**
+     * @see java.util.Dictionary#size()
+     */
+    public int size() {
+        return delegatee.size();
+    }
+
+    /**
+     * @see java.util.Map#values()
+     */
+    public Collection<Object> values() {
+        return delegatee.values();
+    }
+
+    /**
+     * @see java.util.Dictionary#elements()
+     */
+    public Enumeration<Object> elements() {
+        return Collections.enumeration(this.values());
+    }
+
+    /**
+     * @see java.util.Dictionary#keys()
+     */
+    public Enumeration<String> keys() {
+        return Collections.enumeration(this.keySet());
+    }
+
+    /**
+     * @see java.lang.Object#toString()
+     */
+    public String toString() {
+        return this.delegatee.toString();
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/EventUtil.java b/src/main/java/org/apache/sling/event/EventUtil.java
new file mode 100644
index 0000000..503d18e
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/EventUtil.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.PropertyIterator;
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil.JobStatusNotifier.NotifierContext;
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler;
+import org.apache.sling.event.impl.JobEventHandler;
+import org.osgi.service.event.Event;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>EventUtil</code> class is an utility class for
+ * clustered environments.
+ */
+public abstract class EventUtil {
+
+    /** This event property indicates, if the event should be distributed in the cluster (default false). */
+    public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+
+    /** This event property specifies the application node. */
+    public static final String PROPERTY_APPLICATION = "event.application";
+
+    /**
+     * Job Handling
+     */
+
+    /** The job topic property. */
+    public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+
+    /** The property for the unique event id. Value is of type String. */
+    public static final String PROPERTY_JOB_ID = "event.job.id";
+
+    /** The property to set if a job can be run parallel to any other job. */
+    public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
+
+    /** The property to set if a job should only be run on the same app it has been created. */
+    public static final String PROPERTY_JOB_RUN_LOCAL = "event.job.run.local";
+
+    /** The property to track the retry count for jobs. Value is of type Integer. */
+    public static final String PROPERTY_JOB_RETRY_COUNT = "event.job.retrycount";
+
+    /** The property to for setting the maximum number of retries. Value is of type Integer. */
+    public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
+
+    /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
+    public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
+
+    /** The property to set to put the jobs into a separate job queue. This property
+     * spcifies the name of the job queue. If the job queue does not exists yet
+     * a new queue is created.
+     * If a job queue is used, the jobs are never executed in parallel from this queue!
+     */
+    public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
+
+    /** If this property is set with any value, the queue processes the jobs in the same
+     * order as they have arrived.
+     * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified.
+     */
+    public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
+
+    /** The topic for jobs. */
+    public static final String TOPIC_JOB = "org/apache/sling/event/job";
+
+    /**
+     * Timed Events
+     */
+
+    /** The topic for timed events. */
+    public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
+
+    /** The real topic of the event. */
+    public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
+
+    /** The property for the unique event id. */
+    public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
+
+    /** The scheduler expression for the timed event. */
+    public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
+
+    /** The period for the timed event. */
+    public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
+
+    /** The date for the timed event. */
+    public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
+
+    /**
+     * Utility Methods
+     */
+
+    /**
+     * Create a distributable event.
+     * A distributable event is distributed across the cluster.
+     * @param topic
+     * @param properties
+     * @return An OSGi event.
+     */
+    public static Event createDistributableEvent(String topic,
+                                                 Dictionary<String, Object> properties) {
+        final Dictionary<String, Object> newProperties;
+        // create a new dictionary
+        newProperties = new Hashtable<String, Object>();
+        if ( properties != null ) {
+            final Enumeration<String> e = properties.keys();
+            while ( e.hasMoreElements() ) {
+                final String key = e.nextElement();
+                newProperties.put(key, properties.get(key));
+            }
+        }
+        // for now the value has no meaning, so we just put an empty string in it.
+        newProperties.put(PROPERTY_DISTRIBUTE, "");
+        return new Event(topic, newProperties);
+    }
+
+    /**
+     * Should this event be distributed in the cluster?
+     * @param event
+     * @return <code>true</code> if the event should be distributed.
+     */
+    public static boolean shouldDistribute(Event event) {
+        return event.getProperty(PROPERTY_DISTRIBUTE) != null;
+    }
+
+    /**
+     * Is this a local event?
+     * @param event
+     * @return <code>true</code> if this is a local event
+     */
+    public static boolean isLocal(Event event) {
+        final String appId = getApplicationId(event);
+        return appId == null || appId.equals(AbstractRepositoryEventHandler.APPLICATION_ID);
+    }
+
+    /**
+     * Return the application id the event was created at.
+     * @param event
+     * @return The application id or null if the event has been created locally.
+     */
+    public static String getApplicationId(Event event) {
+        return (String)event.getProperty(PROPERTY_APPLICATION);
+    }
+
+    /**
+     * Is this a job event?
+     * This method checks if the event contains the {@link #PROPERTY_JOB_TOPIC}
+     * property.
+     * @param event The event to check.
+     * @return <code>true></code> if this is a job event.
+     */
+    public static boolean isJobEvent(Event event) {
+        return event.getProperty(PROPERTY_JOB_TOPIC) != null;
+    }
+
+    /**
+     * Check if this a job event and return the notifier context.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+        // check if this is a job event
+        if ( !isJobEvent(job) ) {
+            return null;
+        }
+        final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+        if ( ctx == null ) {
+            throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+        }
+        return ctx;
+    }
+
+    /**
+     * Notify a finished job.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static void finishedJob(Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
+        }
+    }
+
+    /**
+     * Notify a failed job.
+     * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static boolean rescheduleJob(Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+           return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+        }
+        return false;
+    }
+
+    /**
+     * Process a job in the background and notify its success.
+     * This method also sends an acknowledge message to the job event handler.
+     */
+    public static void processJob(final Event job, final JobProcessor processor) {
+        // first check for a notifier context to send an acknowledge
+        boolean notify = true;
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
+                // if we don't get an ack, someone else is already processing this job.
+                // we process but do not notify the job event handler.
+                LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job);
+                notify = false;
+            }
+        }
+        final boolean notifyResult = notify;
+
+        final Runnable task = new Runnable() {
+
+            /**
+             * @see java.lang.Runnable#run()
+             */
+            public void run() {
+                boolean result = false;
+                try {
+                    result = processor.process(job);
+                } catch (Throwable t) {
+                    LoggerFactory.getLogger(EventUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+                    // we don't reschedule if an exception occurs
+                    result = true;
+                } finally {
+                    if ( notifyResult ) {
+                        if ( result ) {
+                            EventUtil.finishedJob(job);
+                        } else {
+                            EventUtil.rescheduleJob(job);
+                        }
+                    }
+                }
+            }
+
+        };
+        // check if the job handler thread pool is available
+        if ( JobEventHandler.JOB_THREAD_POOL != null ) {
+            JobEventHandler.JOB_THREAD_POOL.execute(task);
+        } else {
+            // if we don't have a thread pool, we create the thread directly
+            // (this should never happen for jobs, but is a safe fallback and
+            // allows to call this method for other background processing.
+            new Thread(task).start();
+        }
+    }
+
+    /**
+     * This is a private interface which is only public for import reasons.
+     */
+    public static interface JobStatusNotifier {
+
+        String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
+
+        public static final class NotifierContext {
+            public final JobStatusNotifier notifier;
+            public final String eventNodePath;
+
+            public NotifierContext(JobStatusNotifier n, String path) {
+                this.notifier = n;
+                this.eventNodePath = path;
+            }
+        }
+
+        /**
+         * Send an acknowledge message that someone is processing the job.
+         * @param job The job.
+         * @param eventNodePath The storage node in the repository.
+         * @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
+         *   someone else already send an ack for this job.
+         */
+        boolean sendAcknowledge(Event job, String eventNodePath);
+
+        /**
+         * Notify that the job is finished.
+         * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+         * during the processing. If the job should be rescheduled, <code>true</code> indicates
+         * that the job could be rescheduled. If an error occurs or the number of retries is
+         * exceeded, <code>false</code> will be returned.
+         * @param job The job.
+         * @param eventNodePath The storage node in the repository.
+         * @param reschedule Should the event be rescheduled?
+         * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+         */
+        boolean finishedJob(Event job, String eventNodePath, boolean reschedule);
+    }
+
+    /**
+     * Add all java properties as properties to the node.
+     * If the name and the value of a map entry can easily converted into
+     * a repository property, it is directly added. All other java
+     * properties are stored in one binary property.
+     *
+     * @param node The node where all properties are added to
+     * @param properties The map of properties.
+     * @param ignoreProps optional list of property which should be ignored
+     * @param binPropertyName The name of the binary property.
+     * @throws RepositoryException
+     */
+    public static void addProperties(final Node node,
+                                     final Map<String, Object> properties,
+                                     final String[] ignoreProps,
+                                     final String binPropertyName)
+    throws RepositoryException {
+        addProperties(node, new EventPropertiesMap(properties), ignoreProps, binPropertyName);
+    }
+
+    /**
+     * Add all java properties as properties to the node.
+     * If the name and the value of a map entry can easily converted into
+     * a repository property, it is directly added. All other java
+     * properties are stored in one binary property.
+     *
+     * @param node The node where all properties are added to
+     * @param properties The map of properties.
+     * @param ignoreProps optional list of property which should be ignored
+     * @param binPropertyName The name of the binary property.
+     * @throws RepositoryException
+     */
+    public static void addProperties(final Node node,
+                                     final EventPropertiesMap properties,
+                                     final String[] ignoreProps,
+                                     final String binPropertyName)
+    throws RepositoryException {
+        if ( properties != null ) {
+            final List<String> ignorePropList = (ignoreProps == null ? null : Arrays.asList(ignoreProps));
+            // check which props we can write directly and
+            // which we need to write as a binary blob
+            final List<String> propsAsBlob = new ArrayList<String>();
+
+            final Iterator<Map.Entry<String, Object>> i = properties.entrySet().iterator();
+            while ( i.hasNext() ) {
+                final Map.Entry<String, Object> current = i.next();
+
+                if (ignorePropList == null || !ignorePropList.contains(current.getKey()) ) {
+                    // sanity check
+                    if ( current.getValue() != null ) {
+                        if ( !setProperty(current.getKey(), current.getValue(), node) ) {
+                            propsAsBlob.add(current.getKey());
+                        }
+                    }
+                }
+            }
+            // write the remaining properties as a blob
+            if ( propsAsBlob.size() > 0 ) {
+                try {
+                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    final ObjectOutputStream oos = new ObjectOutputStream(baos);
+                    oos.writeInt(propsAsBlob.size());
+                    for(final String propName : propsAsBlob) {
+                        oos.writeObject(propName);
+                        try {
+                            oos.writeObject(properties.get(propName));
+                        } catch (IOException ioe) {
+                            throw new RepositoryException("Unable to serialize property " + propName, ioe);
+                        }
+                    }
+                    oos.close();
+                    node.setProperty(binPropertyName, new ByteArrayInputStream(baos.toByteArray()));
+                } catch (IOException ioe) {
+                    throw new RepositoryException("Unable to serialize properties " + properties, ioe);
+                }
+            }
+        }
+    }
+
+    /**
+     * Read properties from a repository node and create a property map.
+     * @throws RepositoryException
+     * @throws ClassNotFoundException
+     */
+    public static EventPropertiesMap readProperties(final Node node,
+                                                    final String binPropertyName,
+                                                    final String[] ignorePrefixes)
+    throws RepositoryException, ClassNotFoundException {
+        final Map<String, Object> properties = new HashMap<String, Object>();
+
+        // check the properties blob
+        if ( node.hasProperty(binPropertyName) ) {
+            try {
+                final ObjectInputStream ois = new ObjectInputStream(node.getProperty(binPropertyName).getStream());
+                int length = ois.readInt();
+                for(int i=0;i<length;i++) {
+                    final String key = (String)ois.readObject();
+                    final Object value = ois.readObject();
+                    properties.put(key, value);
+                }
+            } catch (java.io.InvalidClassException ice) {
+                throw new ClassNotFoundException("Found invalid class.", ice);
+            } catch (IOException ioe) {
+                throw new RepositoryException("Unable to deserialize event properties.", ioe);
+            }
+        }
+        // now all properties that have been set directly
+        final PropertyIterator pI = node.getProperties();
+        while ( pI.hasNext() ) {
+            final Property p = pI.nextProperty();
+            boolean ignore = p.getName().startsWith("jcr:");
+            if ( !ignore && ignorePrefixes != null ) {
+                int index = 0;
+                while ( !ignore && index < ignorePrefixes.length ) {
+                    ignore = p.getName().startsWith(ignorePrefixes[index]);
+                    index++;
+                }
+            }
+            if ( !ignore ) {
+                final String name = ISO9075.decode(p.getName());
+                if ( p.getDefinition().isMultiple() ) {
+                    final Value[] values = p.getValues();
+                    if ( values.length > 0 ) {
+                        // get first value
+                        final Object firstObject = getPropertyValue(values[0]);
+                        final Object[] array;
+                        if ( firstObject instanceof Boolean ) {
+                            array = new Boolean[values.length];
+                        } else if ( firstObject instanceof Calendar ) {
+                            array = new Calendar[values.length];
+                        } else if ( firstObject instanceof Double ) {
+                            array = new Double[values.length];
+                        } else if ( firstObject instanceof Long ) {
+                            array = new Long[values.length];
+                        } else {
+                            array = new String[values.length];
+                        }
+                        array[0] = firstObject;
+                        int index = 1;
+                        while ( index < values.length ) {
+                            array[index] = getPropertyValue(values[index]);
+                            index++;
+                        }
+                        properties.put(name, array);
+                    }
+                } else {
+                    final Value value = p.getValue();
+                    final Object o = getPropertyValue(value);
+                    properties.put(name, o);
+                }
+            }
+        }
+        return new EventPropertiesMap(properties);
+    }
+
+    /**
+     * Return the converted repository property name
+     * @param name The java object property name
+     * @return The converted name or null if not possible.
+     */
+    public static String getNodePropertyName(final String name) {
+        // if name contains a colon, we can't set it as a property
+        if ( name.indexOf(':') != -1 ) {
+            return null;
+        }
+        return ISO9075.encode(name);
+    }
+
+    /**
+     * Return the converted repository property value
+     * @param valueFactory The value factory
+     * @param eventValue The event value
+     * @return The converted value or null if not possible
+     */
+    public static Value getNodePropertyValue(final ValueFactory valueFactory, final Object eventValue) {
+        final Value val;
+        if (eventValue instanceof Calendar) {
+            val = valueFactory.createValue((Calendar)eventValue);
+        } else if (eventValue instanceof Long) {
+            val = valueFactory.createValue((Long)eventValue);
+        } else if (eventValue instanceof Double) {
+            val = valueFactory.createValue(((Double)eventValue).doubleValue());
+        } else if (eventValue instanceof Boolean) {
+            val = valueFactory.createValue((Boolean) eventValue);
+        } else if (eventValue instanceof String) {
+            val = valueFactory.createValue((String)eventValue);
+        } else {
+            val = null;
+        }
+        return val;
+    }
+
+    /**
+     * Convert the value back to an object.
+     * @param value
+     * @return
+     * @throws RepositoryException
+     */
+    private static Object getPropertyValue(final Value value)
+    throws RepositoryException {
+        final Object o;
+        switch (value.getType()) {
+            case PropertyType.BOOLEAN:
+                o = value.getBoolean(); break;
+            case PropertyType.DATE:
+                o = value.getDate(); break;
+            case PropertyType.DOUBLE:
+                o = value.getDouble(); break;
+            case PropertyType.LONG:
+                o = value.getLong(); break;
+            case PropertyType.STRING:
+                o = value.getString(); break;
+            default: // this should never happen - we convert to a string...
+                o = value.getString();
+        }
+        return o;
+    }
+
+    /**
+     * Try to set the java property as a property of the node.
+     * @param name
+     * @param value
+     * @param node
+     * @return
+     * @throws RepositoryException
+     */
+    private static boolean setProperty(String name, Object value, Node node)
+    throws RepositoryException {
+        final String propName = getNodePropertyName(name);
+        if ( propName == null ) {
+            return false;
+        }
+        final ValueFactory fac = node.getSession().getValueFactory();
+        // check for multi value
+        if ( value.getClass().isArray() ) {
+            final Object[] array = (Object[])value;
+            // now we try to convert each value
+            // and check if all converted values have the same type
+            final Value[] values = new Value[array.length];
+            int index = 0;
+            for(final Object v : array ) {
+                values[index] = getNodePropertyValue(fac, v);
+                if ( values[index] == null ) {
+                    return false;
+                }
+                if ( index > 0 && !values[index-1].getClass().equals(values[index].getClass()) ) {
+                    return false;
+                }
+                index++;
+            }
+            node.setProperty(propName, values);
+            return true;
+        }
+        final Value val = getNodePropertyValue(fac, value);
+        if ( val != null ) {
+            node.setProperty(propName, val);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Improved toString method for an Event.
+     * This method prints out the event topic and all of the properties.
+     */
+    public static String toString(final Event e) {
+        if ( e == null ) {
+            return "<null>";
+        }
+        final StringBuffer buffer =new StringBuffer(e.getClass().getName());
+        buffer.append(" [topic=");
+        buffer.append(e.getTopic());
+        buffer.append(", properties=");
+        final String[] names = e.getPropertyNames();
+        if ( names != null ) {
+            for(int i=0;i<names.length;i++) {
+                if ( i>0) {
+                    buffer.append(",");
+                }
+                buffer.append(names[i]);
+                buffer.append('=');
+                buffer.append(e.getProperty(names[i]));
+            }
+        }
+        buffer.append("]");
+        return buffer.toString();
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/JobProcessor.java b/src/main/java/org/apache/sling/event/JobProcessor.java
new file mode 100644
index 0000000..4f410bc
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/JobProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import org.osgi.service.event.Event;
+
+/**
+ * A job processor processes a job in the background.
+ * It is used by {@link EventUtil#processJob(Event, JobProcessor)}.
+ */
+public interface JobProcessor {
+
+    /**
+     * Execute the job.
+     * If the job fails with a thrown exception/throwable, the process will not be rescheduled.
+     *
+     * @param job The event containing the job description.
+     * @return True if the job could be finished (either successful or by an error).
+     *         Return false if the job should be rescheduled.
+     */
+    boolean process(Event job);
+}
diff --git a/src/main/java/org/apache/sling/event/JobStatusProvider.java b/src/main/java/org/apache/sling/event/JobStatusProvider.java
new file mode 100644
index 0000000..e830323
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/JobStatusProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This service provides the current job processing status.
+ */
+public interface JobStatusProvider {
+
+    /**
+     * This is a unique identifer which can be used to cancel the job.
+     */
+    String PROPERTY_EVENT_ID = "slingevent:eventId";
+
+    /**
+     * @deprecated Use {@link #getScheduledJobs(String)} instead.
+     */
+    @Deprecated
+    Collection<Event> scheduledJobs(String topic);
+
+    /**
+     * Return a list of currently schedulded jobs.
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @return A non null collection.
+     */
+    Collection<Event> getScheduledJobs(String topic);
+
+    /**
+     * Return the jobs which are currently in processing. If there are several application nodes
+     * in the cluster, there could be more than one job in processing
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @return A non null collection.
+     */
+    Collection<Event> getCurrentJobs(String topic);
+
+    /**
+     * Return a list of currently schedulded jobs.
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+     *                    must match the template (AND query). By providing several maps, different filters
+     *                    are possible (OR query).
+     * @return A non null collection.
+     */
+    Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps);
+
+    /**
+     * Return the jobs which are currently in processing. If there are several application nodes
+     * in the cluster, there could be more than one job in processing
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+     *                    must match the template (AND query). By providing several maps, different filters
+     *                    are possible (OR query).
+     * @return A non null collection.
+     */
+    Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps);
+
+    /**
+     * Return all jobs either running or scheduled.
+     * This is actually a convenience method and collects the results from {@link #getScheduledJobs(String, Map...)}
+     * and {@link #getCurrentJobs(String, Map...)}
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @param filterProps A list of filter property maps. Each map acts like a template. The searched job
+     *                    must match the template (AND query). By providing several maps, different filters
+     *                    are possible (OR query).
+     * @return A non null collection.
+     */
+    Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps);
+
+    /**
+     * Cancel this job.
+     * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+     */
+    void cancelJob(String jobId);
+
+    /**
+     * Cancel this job.
+     * This method can be used if the topic and the provided job id is known.
+     * @param topic The job topic as put into the property {@link EventUtil#PROPERTY_JOB_TOPIC}.
+     * @param jobId The unique identifer as put into the property {@link EventUtil#PROPERTY_JOB_ID}.
+     */
+    void cancelJob(String topic, String jobId);
+
+    /**
+     * Wake up the named job queue.
+     * If a job failed, the job queue waits (sleeps) for a configured time. By calling this
+     * method, the job queue can be woken up and force an immediate reprocessing.
+     * @param jobQueueName The name of the queue.
+     */
+    void wakeUpJobQueue(final String jobQueueName);
+}
diff --git a/src/main/java/org/apache/sling/event/ThreadPool.java b/src/main/java/org/apache/sling/event/ThreadPool.java
new file mode 100644
index 0000000..1cca2c2
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/ThreadPool.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.event;
+
+/**
+ * The eventing thread pool is a special thread pool used for the eventing.
+ * The eventing uses a service registered as this interface.
+ * The default implementation is a configurable pool registered with
+ * commons threads.
+ *
+ * @version $Id$
+ */
+public interface ThreadPool extends org.apache.sling.commons.threads.ThreadPool {
+
+    // this is just a marker interface
+}
diff --git a/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java b/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
new file mode 100644
index 0000000..ab931ae
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This service provides the current timed events status.
+ */
+public interface TimedEventStatusProvider {
+
+    /**
+     * This is a unique identifer which can be used to cancel the job.
+     */
+    String PROPERTY_EVENT_ID = "slingevent:eventId";
+
+    /**
+     * Return a list of currently schedulded events.
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @param filterProps A list of filter property maps. Each map acts like a template. The searched event
+     *                    must match the template (AND query). By providing several maps, different filters
+     *                    are possible (OR query).
+     * @return A non null collection.
+     */
+    Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps);
+
+    /**
+     * Return the scheduled event with the given id.
+     * @return The scheduled event or null.
+     */
+    Event getScheduledEvent(String topic, String eventId, String jobId);
+
+    /**
+     * Cancel this timed event.
+     * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+     */
+    void cancelTimedEvent(String jobId);
+}
diff --git a/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java b/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
new file mode 100644
index 0000000..d754584
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListener;
+
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.engine.SlingSettingsService;
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.JobStatusProvider;
+import org.apache.sling.event.ThreadPool;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.jcr.resource.JcrResourceUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for all event handlers in this package.
+ *
+ * @scr.component abstract="true" metatype="no"
+ * @scr.service interface="org.osgi.service.event.EventHandler"
+ */
+public abstract class AbstractRepositoryEventHandler
+    implements EventHandler, EventListener {
+
+    /** Default log. */
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** @scr.property valueRef="DEFAULT_PROPERTY_REPO_PATH" */
+    protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.path";
+
+    /** Default path for the {@link #CONFIG_PROPERTY_REPO_PATH} */
+    private static final String DEFAULT_PROPERTY_REPO_PATH = "/sling/events";
+
+    /** @scr.reference */
+    protected SlingRepository repository;
+
+    /** @scr.reference */
+    protected EventAdmin eventAdmin;
+
+    /** Our application id. */
+    protected String applicationId;
+
+    /** The repository session to write into the repository. */
+    protected Session writerSession;
+
+    /** The path in the repository. */
+    protected String repositoryPath;
+
+    /** Is the background task still running? */
+    protected boolean running;
+
+    /** A local queue for serialising the event processing. */
+    protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
+
+    /** A local queue for writing received events into the repository. */
+    protected final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+    /**
+     * Our thread pool.
+     * @scr.reference */
+    protected ThreadPool threadPool;
+
+    /** @scr.reference
+     *  Sling settings service. */
+    protected SlingSettingsService settingsService;
+
+    public static String APPLICATION_ID;
+
+    /** List of ignored properties to write to the repository. */
+    private static final String[] IGNORE_PROPERTIES = new String[] {
+        EventUtil.PROPERTY_DISTRIBUTE,
+        EventUtil.PROPERTY_APPLICATION,
+        JobStatusProvider.PROPERTY_EVENT_ID,
+        EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME
+    };
+
+    /** List of ignored prefixes to read from the repository. */
+    private static final String[] IGNORE_PREFIXES = new String[] {
+        EventHelper.EVENT_PREFIX
+    };
+
+    /**
+     * Activate this component.
+     * @param context
+     * @throws RepositoryException
+     */
+    protected void activate(final ComponentContext context)
+    throws Exception {
+        this.applicationId = this.settingsService.getSlingId();
+        APPLICATION_ID = this.applicationId;
+        this.repositoryPath = OsgiUtil.toString(context.getProperties().get(
+            CONFIG_PROPERTY_REPO_PATH), DEFAULT_PROPERTY_REPO_PATH);
+
+        this.running = true;
+        // start writer thread
+        this.threadPool.execute(new Runnable() {
+            public void run() {
+                try {
+                    startWriterSession();
+                } catch (RepositoryException e) {
+                    // there is nothing we can do except log!
+                    logger.error("Error during session starting.", e);
+                    running = false;
+                }
+                try {
+                    processWriteQueue();
+                } catch (Throwable t) {
+                    logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+                stopWriterSession();
+            }
+        });
+        this.threadPool.execute(new Runnable() {
+            public void run() {
+                try {
+                    runInBackground();
+                } catch (Throwable t) {
+                    logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+            }
+        });
+    }
+
+    protected abstract void runInBackground() throws RepositoryException;
+
+    protected abstract void processWriteQueue();
+
+    /**
+     * Deactivate this component.
+     * @param context
+     */
+    protected void deactivate(final ComponentContext context) {
+        // stop background threads by putting empty objects into the queue
+        this.running = false;
+        try {
+            this.writeQueue.put(new Event("some", null));
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+        try {
+            this.queue.put(new EventInfo());
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+
+    /**
+     * Create a new session.
+     * @return
+     * @throws RepositoryException
+     */
+    protected Session createSession()
+    throws RepositoryException {
+        final SlingRepository repo = this.repository;
+        if ( repo == null ) {
+            throw new RepositoryException("Repository is currently not available.");
+        }
+        return repo.loginAdministrative(null);
+    }
+
+    /**
+     * Start the repository session and add this handler as an observer
+     * for new events created on other nodes.
+     * @throws RepositoryException
+     */
+    protected void startWriterSession() throws RepositoryException {
+        this.writerSession = this.createSession();
+        if ( this.repositoryPath != null ) {
+            this.ensureRepositoryPath();
+        }
+    }
+
+    /**
+     * Stop the session.
+     */
+    protected void stopWriterSession() {
+        if ( this.writerSession != null ) {
+            try {
+                this.writerSession.getWorkspace().getObservationManager().removeEventListener(this);
+            } catch (RepositoryException e) {
+                // we just ignore it
+                this.logger.warn("Unable to remove event listener.", e);
+            }
+            this.writerSession.logout();
+            this.writerSession = null;
+        }
+    }
+
+    /**
+     * Check if the repository path already exists. If not, create it.
+     */
+    protected Node ensureRepositoryPath()
+    throws RepositoryException {
+        final Node node = JcrResourceUtil.createPath(this.repositoryPath,
+                                   EventHelper.NODETYPE_FOLDER,
+                                   EventHelper.NODETYPE_ORDERED_FOLDER,
+                                   this.writerSession, true);
+
+        return node;
+    }
+
+    /**
+     * Return the node type for the event.
+     */
+    protected String getEventNodeType() {
+        return EventHelper.EVENT_NODE_TYPE;
+    }
+
+    /**
+     * Write an event to the repository.
+     * @param e The event
+     * @param suggestName A suggest name/path for the node.
+     * @throws RepositoryException
+     * @throws IOException
+     */
+    protected Node writeEvent(Event e, String suggestedName)
+    throws RepositoryException {
+        // create new node with name of topic
+        final Node rootNode = this.ensureRepositoryPath();
+
+        final String nodeType = this.getEventNodeType();
+        final String nodeName;
+        if ( suggestedName != null ) {
+            nodeName = suggestedName;
+        } else {
+            final Calendar now = Calendar.getInstance();
+            final int sepPos = nodeType.indexOf(':');
+            nodeName = nodeType.substring(sepPos+1) + "-" + this.applicationId + "-" + now.getTime().getTime();
+        }
+        final Node eventNode = JcrResourceUtil.createPath(rootNode,
+                nodeName,
+                EventHelper.NODETYPE_FOLDER,
+                nodeType, false);
+
+        eventNode.setProperty(EventHelper.NODE_PROPERTY_CREATED, Calendar.getInstance());
+        eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, e.getTopic());
+        eventNode.setProperty(EventHelper.NODE_PROPERTY_APPLICATION, this.applicationId);
+
+        EventUtil.addProperties(eventNode,
+                                new EventPropertiesMap(e),
+                                IGNORE_PROPERTIES,
+                                EventHelper.NODE_PROPERTY_PROPERTIES);
+        this.addNodeProperties(eventNode, e);
+        rootNode.save();
+
+        return eventNode;
+    }
+
+    /**
+     * Read an event from the repository.
+     * @return
+     * @throws RepositoryException
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    protected Event readEvent(Node eventNode)
+    throws RepositoryException, ClassNotFoundException {
+        final String topic = eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString();
+        final EventPropertiesMap eventProps = EventUtil.readProperties(eventNode,
+                EventHelper.NODE_PROPERTY_PROPERTIES,
+                IGNORE_PREFIXES);
+
+        eventProps.put(JobStatusProvider.PROPERTY_EVENT_ID, eventNode.getPath());
+        this.addEventProperties(eventNode, eventProps);
+        try {
+            final Event event = new Event(topic, eventProps);
+            return event;
+        } catch (IllegalArgumentException iae) {
+            // this exception occurs if the topic is not correct (it should never happen,
+            // but you never know)
+            throw new RepositoryException("Unable to read event: " + iae.getMessage(), iae);
+        }
+    }
+
+    /**
+     * Add properties from the node to the event properties.
+     * @param eventNode The repository node.
+     * @param properties The event properties.
+     * @throws RepositoryException
+     */
+    protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+    throws RepositoryException {
+        // nothing to do
+    }
+
+    /**
+     * Add properties when storing event in repository.
+     * This method can be enhanced by sub classes.
+     * @param eventNode
+     * @param event
+     * @throws RepositoryException
+     */
+    protected void addNodeProperties(Node eventNode, Event event)
+    throws RepositoryException {
+        // nothing to do here
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    protected void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignore exception " + e.getMessage(), e);
+        }
+    }
+
+    protected static final class EventInfo {
+        public String nodePath;
+        public Event event;
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java b/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
new file mode 100644
index 0000000..4e07571
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This event handler distributes events across an application cluster.
+ * @scr.component inherit="true" label="%dist.events.name" description="%dist.events.description"
+ * @scr.property name="event.topics" value="*" private="true"
+ * @scr.property name="event.filter" value="(event.distribute=*)" private="true"
+ * @scr.property name="repository.path" value="/var/eventing/distribution" private="true"
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ * @scr.service interface="java.lang.Runnable"
+ * @scr.property name="scheduler.period" value="1800" type="Long"
+ * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
+ */
+public class DistributingEventHandler
+    extends AbstractRepositoryEventHandler
+    implements Runnable {
+
+    /** Default clean up time is 15 minutes. */
+    protected static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
+    protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** We remove everything which is older than 15min by default. */
+    protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext)
+     */
+    protected void activate(ComponentContext context)
+    throws Exception {
+        @SuppressWarnings("unchecked")
+        final Dictionary<String, Object> props = context.getProperties();
+        this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        super.activate(context);
+    }
+
+    /**
+     * Return the query string for the clean up.
+     */
+    protected String getCleanUpQueryString() {
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+        final String dateString = ISO8601.format(deleteBefore);
+
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(getEventNodeType());
+        buffer.append(")[@");
+        buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+        buffer.append(" < xs:dateTime('");
+        buffer.append(dateString);
+        buffer.append("')]");
+
+        return buffer.toString();
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        if ( this.cleanupPeriod > 0 ) {
+            this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+            final String queryString = this.getCleanUpQueryString();
+            // we create an own session for concurrency issues
+            Session s = null;
+            try {
+                s = this.createSession();
+                final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                logger.debug("Executing query {}", queryString);
+                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                final NodeIterator iter = q.execute().getNodes();
+                int count = 0;
+                while ( iter.hasNext() ) {
+                    final Node eventNode = iter.nextNode();
+                    eventNode.remove();
+                    count++;
+                }
+                parentNode.save();
+                logger.debug("Removed {} entries from the repository.", count);
+
+            } catch (RepositoryException e) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during repository cleanup.", e);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( event != null && this.running ) {
+                try {
+                    this.writeEvent(event, null);
+                } catch (Exception e) {
+                    this.logger.error("Exception during writing the event to the repository.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
+    protected void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( info != null && this.running ) {
+                if ( info.nodePath != null) {
+                    Session session = null;
+                    try {
+                        session = this.createSession();
+                        final Node eventNode = (Node)session.getItem(info.nodePath);
+                        final EventAdmin localEA = this.eventAdmin;
+                        if ( localEA != null ) {
+                            localEA.postEvent(this.readEvent(eventNode));
+                        } else {
+                            this.logger.error("Unable to post event as no event admin is available.");
+                        }
+                    } catch (Exception ex) {
+                        this.logger.error("Exception during reading the event from the repository.", ex);
+                    } finally {
+                        if ( session != null ) {
+                            session.logout();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(final Event event) {
+        try {
+            this.writeQueue.put(event);
+        } catch (InterruptedException ex) {
+            // we ignore this
+            this.ignoreException(ex);
+        }
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(final EventIterator iterator) {
+        while ( iterator.hasNext() ) {
+            final javax.jcr.observation.Event event = iterator.nextEvent();
+            try {
+                final EventInfo info = new EventInfo();
+                info.nodePath = event.getPath();
+                this.queue.put(info);
+            } catch (InterruptedException ex) {
+                // we ignore this
+                this.ignoreException(ex);
+            } catch (RepositoryException ex) {
+                this.logger.error("Exception during reading the event from the repository.", ex);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+     */
+    protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+    throws RepositoryException {
+        super.addEventProperties(eventNode, properties);
+        properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+    }
+
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+     */
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/EventHelper.java b/src/main/java/org/apache/sling/event/impl/EventHelper.java
new file mode 100644
index 0000000..65a386b
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/EventHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+
+/**
+ * Helper class defining some constants and utility methods.
+ */
+public abstract class EventHelper {
+
+    /** The name of the thread pool for the eventing stuff. */
+    public static final String THREAD_POOL_NAME = "SLING_EVENTING";
+
+    /** The namespace prefix. */
+    public static final String EVENT_PREFIX = "slingevent:";
+
+    public static final String NODE_PROPERTY_TOPIC = "slingevent:topic";
+    public static final String NODE_PROPERTY_APPLICATION = "slingevent:application";
+    public static final String NODE_PROPERTY_CREATED = "slingevent:created";
+    public static final String NODE_PROPERTY_PROPERTIES = "slingevent:properties";
+    public static final String NODE_PROPERTY_PROCESSOR = "slingevent:processor";
+    public static final String NODE_PROPERTY_JOBID = "slingevent:id";
+    public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
+    public static final String NODE_PROPERTY_TE_EXPRESSION = "slingevent:expression";
+    public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
+    public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
+
+    public static final String EVENT_NODE_TYPE = "slingevent:Event";
+    public static final String JOB_NODE_TYPE = "slingevent:Job";
+    public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
+
+    /** The nodetype for newly created intermediate folders */
+    public static final String NODETYPE_FOLDER = "sling:Folder";
+
+    /** The nodetype for newly created folders */
+    public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
+
+    /** Allowed characters for a node name */
+    private static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!¤$%&()=[]?";
+    /** Replacement characters for unallowed characters in a node name */
+    private static final char REPLACEMENT_CHAR = '_';
+
+    /**
+     * Filter the node name for not allowed characters and replace them.
+     * @param nodeName The suggested node name.
+     * @return The filtered node name.
+     */
+    public static String filter(final String nodeName) {
+        final StringBuffer sb  = new StringBuffer();
+        char lastAdded = 0;
+
+        for(int i=0; i < nodeName.length(); i++) {
+            final char c = nodeName.charAt(i);
+            char toAdd = c;
+
+            if (ALLOWED_CHARS.indexOf(c) < 0) {
+                if (lastAdded == REPLACEMENT_CHAR) {
+                    // do not add several _ in a row
+                    continue;
+                }
+                toAdd = REPLACEMENT_CHAR;
+
+            } else if(i == 0 && Character.isDigit(c)) {
+                sb.append(REPLACEMENT_CHAR);
+            }
+
+            sb.append(toAdd);
+            lastAdded = toAdd;
+        }
+
+        if (sb.length()==0) {
+            sb.append(REPLACEMENT_CHAR);
+        }
+
+        return sb.toString();
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
new file mode 100644
index 0000000..ea9f91a
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.ThreadPool;
+import org.osgi.service.component.ComponentContext;
+
+
+/**
+ * The configurable eventing thread pool.
+ * @scr.component label="%event.pool.name" description="%event.pool.description"
+ * @scr.service interface="org.apache.sling.event.ThreadPool"
+ *
+ * @scr.property nameRef="PROPERTY_MIN_POOL_SIZE" valueRef="DEFAULT_MIN_POOL_SIZE"
+ * @scr.property nameRef="PROPERTY_MAX_POOL_SIZE" valueRef="DEFAULT_MAX_POOL_SIZE"
+ * @scr.property nameRef="PROPERTY_QUEUEL_SIZE" valueRef="DEFAULT_QUEUE_SIZE"
+ */
+public class EventingThreadPool implements ThreadPool {
+
+    /** @scr.reference */
+    protected ThreadPoolManager threadPoolManager;
+
+    /** The real thread pool used. */
+    private org.apache.sling.commons.threads.ThreadPool threadPool;
+
+    private static final String PROPERTY_MIN_POOL_SIZE = "minPoolSize";
+    private static final String PROPERTY_MAX_POOL_SIZE = "maxPoolSize";
+    private static final String PROPERTY_QUEUEL_SIZE = "queueSize";
+
+    private static final int DEFAULT_MIN_POOL_SIZE = 20; // this is sufficient for all threads + approx 10 job queues
+    private static final int DEFAULT_MAX_POOL_SIZE = 30;
+    private static final int DEFAULT_QUEUE_SIZE = 50; // queue upto 50 threads
+
+    /**
+     * Activate this component.
+     * @param context
+     */
+    protected void activate(final ComponentContext ctx) throws Exception {
+        // start background threads
+        if ( this.threadPoolManager == null ) {
+            throw new Exception("No ThreadPoolManager found.");
+        }
+        final ThreadPoolConfig config = new ThreadPoolConfig();
+        config.setMinPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MIN_POOL_SIZE), DEFAULT_MIN_POOL_SIZE));
+        config.setMaxPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MAX_POOL_SIZE), DEFAULT_MAX_POOL_SIZE));
+        config.setQueueSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_QUEUEL_SIZE), DEFAULT_QUEUE_SIZE));
+        config.setShutdownGraceful(true);
+        threadPoolManager.create(EventHelper.THREAD_POOL_NAME, config);
+
+        this.threadPool = threadPoolManager.get(EventHelper.THREAD_POOL_NAME);
+        if ( this.threadPool == null ) {
+            throw new Exception("No thread pool with name " + EventHelper.THREAD_POOL_NAME + " found.");
+        }
+    }
+
+    /**
+     * Deactivate this component.
+     * @param context
+     */
+    protected void deactivate(final ComponentContext context) {
+        this.threadPool = null;
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
+     */
+    public void execute(Runnable runnable) {
+        threadPool.execute(runnable);
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
+     */
+    public ThreadPoolConfig getConfiguration() {
+        return threadPool.getConfiguration();
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#getName()
+     */
+    public String getName() {
+        return threadPool.getName();
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#shutdown()
+     */
+    public void shutdown() {
+        threadPool.shutdown();
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java b/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
new file mode 100644
index 0000000..89a6bc3
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.AbstractRepositoryEventHandler.EventInfo;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+    private EventInfo eventInfo;
+
+    private final Object lock = new Object();
+
+    private boolean isWaiting = false;
+
+    private boolean markForCleanUp = false;
+
+    private boolean finished = false;
+
+    private boolean isSleeping = false;
+
+    private String schedulerJobName;
+    private Thread sleepingThread;
+
+    public EventInfo waitForFinish() throws InterruptedException {
+        this.isWaiting = true;
+        this.markForCleanUp = false;
+        this.lock.wait();
+        this.isWaiting = false;
+        final EventInfo object = this.eventInfo;
+        this.eventInfo = null;
+        return object;
+    }
+
+    public void markForCleanUp() {
+        if ( !this.isWaiting ) {
+            this.markForCleanUp = true;
+        }
+    }
+
+    public boolean isMarkedForCleanUp() {
+        return !this.isWaiting && this.markForCleanUp;
+    }
+
+    public void notifyFinish(EventInfo i) {
+        this.eventInfo = i;
+        this.lock.notify();
+    }
+
+    public Object getLock() {
+        return lock;
+    }
+
+    public boolean isWaiting() {
+        return this.isWaiting;
+    }
+
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean flag) {
+        this.finished = flag;
+    }
+
+    public void setSleeping(boolean flag) {
+        this.isSleeping = flag;
+        if ( !flag ) {
+            this.schedulerJobName = null;
+            this.sleepingThread = null;
+        }
+    }
+
+    public void setSleeping(boolean flag, String schedulerJobName) {
+        this.schedulerJobName = schedulerJobName;
+        this.setSleeping(flag);
+    }
+
+    public void setSleeping(boolean flag, Thread sleepingThread) {
+        this.sleepingThread = sleepingThread;
+        this.setSleeping(flag);
+    }
+
+    public String getSchedulerJobName() {
+        return this.schedulerJobName;
+    }
+
+    public Thread getSleepingThread() {
+        return this.sleepingThread;
+    }
+
+    public boolean isSleeping() {
+        return this.isSleeping;
+    }
+}
+
diff --git a/src/main/java/org/apache/sling/event/impl/JobEventHandler.java b/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
new file mode 100644
index 0000000..4a9b187
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
@@ -0,0 +1,1510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+
+import javax.jcr.Item;
+import javax.jcr.ItemExistsException;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.JobStatusProvider;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.ComponentConstants;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * @scr.component label="%job.events.name" description="%job.events.description" immediate="true"
+ * @scr.service interface="org.apache.sling.event.JobStatusProvider"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
+ *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ *               values.started="org/osgi/framework/BundleEvent/STARTED"
+ *               private="true"
+ * @scr.property name="repository.path" value="/var/eventing/jobs" private="true"
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ * @scr.service interface="java.lang.Runnable"
+ * @scr.property name="scheduler.period" value="300" type="Long" label="%jobscheduler.period.name" description="%jobscheduler.period.description"
+ * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
+ */
+public class JobEventHandler
+    extends AbstractRepositoryEventHandler
+    implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
+
+    /** A map for keeping track of currently processed job topics. */
+    private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+    /** A map for the different job queues. */
+    private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>();
+
+    /** Default sleep time. */
+    private static final long DEFAULT_SLEEP_TIME = 30;
+
+    /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
+    private static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+
+    /** Default number of job retries. */
+    private static final int DEFAULT_MAX_JOB_RETRIES = 10;
+
+    /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
+    private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
+
+    /** Default number of seconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs
+
+    /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */
+    private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs";
+
+    /** Default nubmer of parallel jobs. */
+    private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+
+    /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
+    private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+
+    /** We check every 30 secs by default. */
+    private long sleepTime;
+
+    /** How often should a job be retried by default. */
+    private int maxJobRetries;
+
+    /** How long do we wait for an ack (in ms) */
+    private long waitForAckMs;
+
+    /** Maximum parallel running jobs for a single queue. */
+    private long maximumParallelJobs;
+
+    /** Background session. */
+    private Session backgroundSession;
+
+    /** Unloaded jobs. */
+    private Set<String>unloadedJobs = new HashSet<String>();
+
+    /** List of deleted jobs. */
+    private Set<String>deletedJobs = new HashSet<String>();
+
+    /** Default clean up time is 5 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 5;
+
+    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" label="%jobcleanup.period.name" description="%jobcleanup.period.description" */
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** We remove everything which is older than 5 min by default. */
+    private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+    /** The scheduler for rescheduling jobs. @scr.reference */
+    private Scheduler scheduler;
+
+    /** Our component context. */
+    private ComponentContext componentContext;
+
+    /** The map of events we're currently processing. */
+    private final Map<String, StartedJobInfo> processingEventsList = new HashMap<String, StartedJobInfo>();
+
+    public static ThreadPool JOB_THREAD_POOL;
+
+    /** Sync lock */
+    private final Object writeLock = new Object();
+
+    /** Sync lock */
+    private final Object backgroundLock = new Object();
+
+    /** Number of parallel jobs for the main queue. */
+    private long parallelJobCount;
+
+    /**
+     * Activate this component.
+     * @param context
+     * @throws RepositoryException
+     */
+    protected void activate(final ComponentContext context)
+    throws Exception {
+        @SuppressWarnings("unchecked")
+        final Dictionary<String, Object> props = context.getProperties();
+        this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
+        this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
+        this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK) * 1000;
+        this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS), DEFAULT_MAXIMUM_PARALLEL_JOBS);
+        this.componentContext = context;
+        super.activate(context);
+        JOB_THREAD_POOL = this.threadPool;
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
+     */
+    protected void deactivate(final ComponentContext context) {
+        super.deactivate(context);
+        synchronized ( this.jobQueues ) {
+            final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
+            while ( i.hasNext() ) {
+                final JobBlockingQueue jbq = i.next();
+                // wake up qeue
+                if ( jbq.isWaiting() ) {
+                    synchronized ( jbq.getLock()) {
+                        jbq.notifyFinish(null);
+                    }
+                }
+                // continue queue processing
+                try {
+                    jbq.put(new EventInfo());
+                } catch (InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+        }
+        if ( this.backgroundSession != null ) {
+            synchronized ( this.backgroundLock ) {
+                try {
+                    this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+                } catch (RepositoryException e) {
+                    // we just ignore it
+                    this.logger.warn("Unable to remove event listener.", e);
+                }
+                this.backgroundSession.logout();
+                this.backgroundSession = null;
+            }
+        }
+        this.componentContext = null;
+        JOB_THREAD_POOL = null;
+    }
+
+    /**
+     * Return the query string for the clean up.
+     */
+    private String getCleanUpQueryString() {
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+        final String dateString = ISO8601.format(deleteBefore);
+
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(getEventNodeType());
+        buffer.append(")[@");
+        buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+        buffer.append(" < xs:dateTime('");
+        buffer.append(dateString);
+        buffer.append("')]");
+
+        return buffer.toString();
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        if ( this.running ) {
+            // check for jobs that were started but never got an aknowledge
+            final long tooOld = System.currentTimeMillis() - this.waitForAckMs;
+            // to keep the synchronized block as fast as possible we just store the
+            // jobs to be removed in a new list and process this list afterwards
+            final List<StartedJobInfo> restartJobs = new ArrayList<StartedJobInfo>();
+            synchronized ( this.processingEventsList ) {
+                final Iterator<Map.Entry<String, StartedJobInfo>> i = this.processingEventsList.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, StartedJobInfo> entry = i.next();
+                    if ( entry.getValue().started <= tooOld ) {
+                        restartJobs.add(entry.getValue());
+                    }
+                }
+            }
+            // remove obsolete jobs from the repository
+            if ( this.cleanupPeriod > 0 ) {
+                this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
+
+                final String queryString = this.getCleanUpQueryString();
+                // we create an own session for concurrency issues
+                Session s = null;
+                try {
+                    s = this.createSession();
+                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                    logger.debug("Executing query {}", queryString);
+                    final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                    final NodeIterator iter = q.execute().getNodes();
+                    int count = 0;
+                    while ( iter.hasNext() ) {
+                        final Node eventNode = iter.nextNode();
+                        eventNode.remove();
+                        count++;
+                    }
+                    parentNode.save();
+                    logger.debug("Removed {} entries from the repository.", count);
+
+                } catch (RepositoryException e) {
+                    // in the case of an error, we just log this as a warning
+                    this.logger.warn("Exception during repository cleanup.", e);
+                } finally {
+                    if ( s != null ) {
+                        s.logout();
+                    }
+                }
+            }
+            // restart jobs is now a list of potential candidates, we now have to check
+            // each candidate separately again!
+            if ( restartJobs.size() > 0 ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    // we just ignore this
+                    e.printStackTrace();
+                }
+            }
+            final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+            while ( jobIter.hasNext() ) {
+                final StartedJobInfo info = jobIter.next();
+                boolean process = false;
+                synchronized ( this.processingEventsList ) {
+                    process = this.processingEventsList.remove(info.nodePath) != null;
+                }
+                if ( process ) {
+                    this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath);
+                    this.finishedJob(info.event, info.nodePath, true);
+                }
+            }
+
+            // check for idle threads
+            synchronized ( this.jobQueues ) {
+                final Iterator<Map.Entry<String, JobBlockingQueue>> i = this.jobQueues.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, JobBlockingQueue> current = i.next();
+                    final JobBlockingQueue jbq = current.getValue();
+                    if ( jbq.size() == 0 ) {
+                        if ( jbq.isMarkedForCleanUp() ) {
+                            // set to finished
+                            jbq.setFinished(true);
+                            // wake up
+                            try {
+                                jbq.put(new EventInfo());
+                            } catch (InterruptedException e) {
+                                this.ignoreException(e);
+                            }
+                            // remove
+                            i.remove();
+                        } else {
+                            // mark to be removed during next cycle
+                            jbq.markForCleanUp();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( event != null && this.running ) {
+                logger.debug("Persisting job {}", event);
+                try {
+                    this.writerSession.refresh(false);
+                } catch (RepositoryException re) {
+                    // we just ignore this
+                    this.ignoreException(re);
+                }
+                final EventInfo info = new EventInfo();
+                info.event = event;
+                final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+                final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                final String nodePath = this.getNodePath(jobTopic, jobId);
+
+                // if the job has no job id, we can just write the job to the repo and don't
+                // need locking
+                if ( jobId == null ) {
+                    try {
+                        final Node eventNode = this.writeEvent(event, nodePath);
+                        info.nodePath = eventNode.getPath();
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+                    }
+                } else {
+                    try {
+                        // let's first search for an existing node with the same id
+                        final Node parentNode = this.ensureRepositoryPath();
+                        Node foundNode = null;
+                        if ( parentNode.hasNode(nodePath) ) {
+                            foundNode = parentNode.getNode(nodePath);
+                        }
+                        if ( foundNode != null ) {
+                            // if the node is locked, someone else was quicker
+                            // and we don't have to process this job
+                            if ( !foundNode.isLocked() ) {
+                                // node is already in repository, so if not finished we just use it
+                                // otherwise it has already been processed
+                                try {
+                                    if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) {
+                                        info.nodePath = foundNode.getPath();
+                                    }
+                                } catch (RepositoryException re) {
+                                    // if anything goes wrong, it means that (hopefully) someone
+                                    // else is processing this node
+                                }
+                            }
+                        } else {
+                            // We now write the event into the repository
+                            try {
+                                final Node eventNode = this.writeEvent(event, nodePath);
+                                info.nodePath = eventNode.getPath();
+                            } catch (ItemExistsException iee) {
+                                // someone else did already write this node in the meantime
+                                // nothing to do for us
+                            }
+                        }
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job '" + event + "' to repository at " + nodePath, re);
+                    }
+                }
+                // if we were able to write the event into the repository
+                // we will queue it for processing
+                if ( info.nodePath != null ) {
+                    try {
+                        this.queue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * This method runs in the background and processes the local queue.
+     */
+    protected void runInBackground() throws RepositoryException {
+        this.backgroundSession = this.createSession();
+        this.backgroundSession.getWorkspace().getObservationManager()
+                .addEventListener(this,
+                                  javax.jcr.observation.Event.PROPERTY_REMOVED
+                                    |javax.jcr.observation.Event.NODE_REMOVED,
+                                  this.repositoryPath,
+                                  true,
+                                  null,
+                                  new String[] {this.getEventNodeType()},
+                                  true);
+        // give the system some time to start
+        try {
+            Thread.sleep(1000 * 30); // 30 secs
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+        // load unprocessed jobs from repository
+        if ( this.running ) {
+            this.loadJobs();
+        } else {
+            final ComponentContext ctx = this.componentContext;
+            // deactivate
+            if ( ctx != null ) {
+                logger.info("Deactivating component {} due to errors during startup.", ctx.getProperties().get(Constants.SERVICE_ID));
+                final String name = (String) componentContext.getProperties().get(
+                    ComponentConstants.COMPONENT_NAME);
+                ctx.disableComponent(name);
+            }
+        }
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+
+            if ( info != null && this.running ) {
+                // check for local only jobs and remove them from the queue if they're meant
+                // for another application node
+                final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
+                if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
+                    && appId != null && !this.applicationId.equals(appId) ) {
+                    info = null;
+                }
+
+                // check if we should put this into a separate queue
+                if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                    final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
+                    synchronized ( this.jobQueues ) {
+                        BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+                        if ( jobQueue == null ) {
+                            final JobBlockingQueue jq = new JobBlockingQueue();
+                            jobQueue = jq;
+                            this.jobQueues.put(queueName, jq);
+                            // Start background thread
+                            this.threadPool.execute(new Runnable() {
+
+                                /**
+                                 * @see java.lang.Runnable#run()
+                                 */
+                                public void run() {
+                                    while ( running && !jq.isFinished() ) {
+                                        logger.info("Starting job queue {}", queueName);
+                                        try {
+                                            runJobQueue(queueName, jq);
+                                        } catch (Throwable t) {
+                                            logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                                        }
+                                    }
+                                }
+
+                            });
+                        }
+                        try {
+                            jobQueue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen
+                            this.ignoreException(e);
+                        }
+                    }
+                    // don't process this here
+                    info = null;
+                }
+
+                // if we still have a job, process it
+                if ( info != null ) {
+                    this.executeJob(info, null);
+                }
+            }
+        }
+    }
+
+    /**
+     * Execute a job queue
+     * @param queueName The name of the job queue
+     * @param jobQueue The job queue
+     */
+    private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) {
+        EventInfo info = null;
+        while ( this.running && !jobQueue.isFinished() ) {
+            if ( info == null ) {
+                // so let's wait/get the next job from the queue
+                try {
+                    info = jobQueue.take();
+                } catch (InterruptedException e) {
+                    // we ignore this
+                    this.ignoreException(e);
+                }
+            }
+
+            if ( info != null && this.running && !jobQueue.isFinished() ) {
+                synchronized ( jobQueue.getLock()) {
+                    final EventInfo processInfo = info;
+                    info = null;
+                    if ( this.executeJob(processInfo, jobQueue) ) {
+                        EventInfo newInfo = null;
+                        try {
+                            newInfo = jobQueue.waitForFinish();
+                        } catch (InterruptedException e) {
+                            this.ignoreException(e);
+                        }
+                        // if we have an info, this is a reschedule
+                        if ( newInfo != null ) {
+                            final EventInfo newEventInfo = newInfo;
+                            final Event job = newInfo.event;
+
+                            // is this an ordered queue?
+                            final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+
+                            if ( orderedQueue ) {
+                                // we just sleep for the delay time - if none, we continue and retry
+                                // this job again
+                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                    jobQueue.setSleeping(true, Thread.currentThread());
+                                    try {
+                                        Thread.sleep(delay);
+                                    } catch (InterruptedException e) {
+                                        this.ignoreException(e);
+                                    } finally {
+                                        jobQueue.setSleeping(false);
+                                    }
+                                }
+                                info = newInfo;
+                            } else {
+                                // delay rescheduling?
+                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                    final Date fireDate = new Date();
+                                    fireDate.setTime(System.currentTimeMillis() + delay);
+
+                                    final String schedulerJobName = "Waiting:" + queueName;
+                                    final Runnable t = new Runnable() {
+                                        public void run() {
+                                            jobQueue.setSleeping(true, schedulerJobName);
+                                            try {
+                                                jobQueue.put(newEventInfo);
+                                            } catch (InterruptedException e) {
+                                                // this should never happen
+                                                ignoreException(e);
+                                            } finally {
+                                                jobQueue.setSleeping(false);
+                                            }
+                                        }
+                                    };
+                                    try {
+                                        this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
+                                    } catch (Exception e) {
+                                        // we ignore the exception and just put back the job in the queue
+                                        ignoreException(e);
+                                        t.run();
+                                    }
+                                } else {
+                                    // put directly into queue
+                                    try {
+                                        jobQueue.put(newInfo);
+                                    } catch (InterruptedException e) {
+                                        // this should never happen
+                                        this.ignoreException(e);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Process a job
+     */
+    private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+        boolean putback = false;
+        boolean wait = false;
+        synchronized (this.backgroundLock) {
+            try {
+                this.backgroundSession.refresh(false);
+                // check if the node still exists
+                if ( this.backgroundSession.itemExists(info.nodePath)
+                     && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+                    final Event event = info.event;
+                    final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                    final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                                    || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+                    // check how we can process this job
+                    // if parallel processing is allowed, we can just process
+                    // if not we should check if any other job with the same topic is currently running
+                    boolean process = parallelProcessing;
+                    if ( !parallelProcessing ) {
+                        synchronized ( this.processingMap ) {
+                            final Boolean value = this.processingMap.get(jobTopic);
+                            if ( value == null || !value.booleanValue() ) {
+                                this.processingMap.put(jobTopic, Boolean.TRUE);
+                                process = true;
+                            }
+                        }
+
+                    } else {
+                        // check number of parallel jobs for main queue
+                        if ( jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) {
+                            process = false;
+                            wait = true;
+                        }
+                    }
+                    if ( process ) {
+                        boolean unlock = true;
+                        try {
+                            final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                            if ( !eventNode.isLocked() ) {
+                                // lock node
+                                try {
+                                    eventNode.lock(false, true);
+                                } catch (RepositoryException re) {
+                                    // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                    process = false;
+                                }
+                                if ( process ) {
+                                    unlock = false;
+                                    this.processJob(info.event, eventNode, jobQueue == null);
+                                    return true;
+                                }
+                            }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
+                        } finally {
+                            if ( unlock && !parallelProcessing ) {
+                                synchronized ( this.processingMap ) {
+                                    this.processingMap.put(jobTopic, Boolean.FALSE);
+                                }
+                            }
+                        }
+                    } else {
+                        try {
+                            // check if the node is in processing or already finished
+                            final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                            if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                                putback = true;
+                            }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+            } catch (RepositoryException re) {
+                this.ignoreException(re);
+            }
+
+        }
+        // if this is the main queue and we have reached the max number of parallel jobs
+        // we wait a little bit before continuing
+        if ( wait ) {
+            try {
+                Thread.sleep(sleepTime * 1000);
+            } catch (InterruptedException ie) {
+                // ignore
+                ignoreException(ie);
+            }
+        }
+        // if we have to put back the job, we do it now
+        if ( putback ) {
+            final EventInfo eInfo = info;
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+                // we put it back into the queue after a specific time
+            final Runnable r = new Runnable() {
+
+                /**
+                 * @see java.lang.Runnable#run()
+                 */
+                public void run() {
+                    try {
+                        queue.put(eInfo);
+                    } catch (InterruptedException e) {
+                        // ignore
+                        ignoreException(e);
+                    }
+                }
+
+            };
+            try {
+                this.scheduler.fireJobAt(null, r, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception
+                ignoreException(e);
+                // then wait for the time and readd the job
+                try {
+                    Thread.sleep(sleepTime * 1000);
+                } catch (InterruptedException ie) {
+                    // ignore
+                    ignoreException(ie);
+                }
+                r.run();
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getEventNodeType()
+     */
+    protected String getEventNodeType() {
+        return EventHelper.JOB_NODE_TYPE;
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(final Event event) {
+        logger.debug("Receiving event {}", event);
+        // we ignore remote job events
+        if ( EventUtil.isLocal(event) ) {
+            // check for bundle event
+            if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
+                logger.debug("Handling local job {}", event);
+                // job event
+                final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+
+                //  job topic must be set, otherwise we ignore this event!
+                if ( jobTopic != null ) {
+                    // queue the event in order to respond quickly
+                    try {
+                        this.writeQueue.put(event);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                } else {
+                    this.logger.warn("Event does not contain job topic: {}", event);
+                }
+
+            } else {
+                // bundle event started or updated
+                boolean doIt = false;
+                synchronized ( this.unloadedJobs ) {
+                    if ( this.unloadedJobs.size() > 0 ) {
+                        doIt = true;
+                    }
+                }
+                if ( doIt ) {
+                    final Runnable t = new Runnable() {
+
+                        public void run() {
+                            synchronized (unloadedJobs) {
+                                Session s = null;
+                                final Set<String> newUnloadedJobs = new HashSet<String>();
+                                newUnloadedJobs.addAll(unloadedJobs);
+                                try {
+                                    s = createSession();
+                                    for(String path : unloadedJobs ) {
+                                        newUnloadedJobs.remove(path);
+                                        try {
+                                            if ( s.itemExists(path) ) {
+                                                final Node eventNode = (Node) s.getItem(path);
+                                                if ( !eventNode.isLocked() ) {
+                                                    try {
+                                                        final EventInfo info = new EventInfo();
+                                                        info.event = readEvent(eventNode);
+                                                        info.nodePath = path;
+                                                        try {
+                                                            queue.put(info);
+                                                        } catch (InterruptedException e) {
+                                                            // we ignore this exception as this should never occur
+                                                            ignoreException(e);
+                                                        }
+                                                    } catch (ClassNotFoundException cnfe) {
+                                                        newUnloadedJobs.add(path);
+                                                        ignoreException(cnfe);
+                                                    }
+                                                }
+                                            }
+                                        } catch (RepositoryException re) {
+                                            // we ignore this and readd
+                                            newUnloadedJobs.add(path);
+                                            ignoreException(re);
+                                        }
+                                    }
+                                } catch (RepositoryException re) {
+                                    // unable to create session, so we try it again next time
+                                    ignoreException(re);
+                                } finally {
+                                    if ( s != null ) {
+                                        s.logout();
+                                    }
+                                    unloadedJobs.clear();
+                                    unloadedJobs.addAll(newUnloadedJobs);
+                                }
+                            }
+                        }
+
+                    };
+                    this.threadPool.execute(t);
+                }
+            }
+        }
+    }
+
+    /**
+     * Create a unique node path (folder and name) for the job.
+     */
+    private String getNodePath(final String jobTopic, final String jobId) {
+        if ( jobId != null ) {
+            return jobTopic.replace('/', '.') + "/" + EventHelper.filter(jobId);
+        }
+        return jobTopic.replace('/', '.') + "/Job " + UUID.randomUUID().toString();
+    }
+
+    /**
+     * Process a job and unlock the node in the repository.
+     * @param event The original event.
+     * @param eventNode The node in the repository where the job is stored.
+     * @param isMainQueue Is this the main queue?
+     */
+    private void processJob(Event event, Node eventNode, boolean isMainQueue)  {
+        final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                           || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+        boolean unlock = true;
+        try {
+            if ( isMainQueue && !parallelProcessing ) {
+                this.parallelJobCount++;
+            }
+            final String nodePath = eventNode.getPath();
+            final Event jobEvent = this.getJobEvent(event, nodePath);
+            eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
+            eventNode.save();
+            final EventAdmin localEA = this.eventAdmin;
+            if ( localEA != null ) {
+                final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis());
+                // let's add the event to our processing list
+                synchronized ( this.processingEventsList ) {
+                    this.processingEventsList.put(nodePath, jobInfo);
+                }
+
+                // we need async delivery, otherwise we might create a deadlock
+                // as this method runs inside a synchronized block and the finishedJob
+                // method as well!
+                localEA.postEvent(jobEvent);
+                // do not unlock if sending was successful
+                unlock = false;
+            } else {
+                this.logger.error("Job event can't be sent as no event admin is available.");
+            }
+        } catch (RepositoryException re) {
+            // if an exception occurs, we just log
+            this.logger.error("Exception during job processing.", re);
+        } finally {
+            if ( unlock ) {
+                if ( isMainQueue && !parallelProcessing ) {
+                    this.parallelJobCount--;
+                }
+                if ( !parallelProcessing ) {
+                    synchronized ( this.processingMap ) {
+                        this.processingMap.put(jobTopic, Boolean.FALSE);
+                    }
+                }
+                // unlock node
+                try {
+                    eventNode.unlock();
+                } catch (RepositoryException e) {
+                    // if unlock fails, we silently ignore this
+                    this.ignoreException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Create the real job event.
+     * This generates a new event object with the same properties, but with the
+     * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+     * @param e The job event.
+     * @return The real job event.
+     */
+    private Event getJobEvent(Event e, String nodePath) {
+        final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+        final Dictionary<String, Object> properties = new EventPropertiesMap(e);
+        // put properties for finished job callback
+        properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+                new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
+        return new Event(eventTopic, properties);
+    }
+
+    /**
+     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+     */
+    protected void addNodeProperties(Node eventNode, Event event)
+    throws RepositoryException {
+        super.addNodeProperties(eventNode, event);
+        eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
+        final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+        if ( jobId != null ) {
+            eventNode.setProperty(EventHelper.NODE_PROPERTY_JOBID, jobId);
+        }
+        final long retryCount = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT), 0);
+        final long retries = OsgiUtil.toLong(event.getProperty(EventUtil.PROPERTY_JOB_RETRIES), this.maxJobRetries);
+        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, retries);
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+     */
+    protected void addEventProperties(Node eventNode,
+                                      Dictionary<String, Object> properties)
+    throws RepositoryException {
+        super.addEventProperties(eventNode, properties);
+        // convert to integers (jcr only supports long)
+        if ( properties.get(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+            properties.put(EventUtil.PROPERTY_JOB_RETRIES, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRIES).toString()));
+        }
+        if ( properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+            properties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(properties.get(EventUtil.PROPERTY_JOB_RETRY_COUNT).toString()));
+        }
+        // add application id
+        properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(EventIterator iter) {
+        // we create an own session here
+        Session s = null;
+        try {
+            s = this.createSession();
+            while ( iter.hasNext() ) {
+                final javax.jcr.observation.Event event = iter.nextEvent();
+                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+                   || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+                    try {
+                        final String propPath = event.getPath();
+                        int pos = propPath.lastIndexOf('/');
+                        final String nodePath = propPath.substring(0, pos);
+                        final String propertyName = propPath.substring(pos+1);
+
+                        // we are only interested in unlocks
+                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                            boolean doNotProcess = false;
+                            synchronized ( this.deletedJobs ) {
+                                doNotProcess = this.deletedJobs.remove(nodePath);
+                            }
+                            if ( !doNotProcess ) {
+                                final Node eventNode = (Node) s.getItem(nodePath);
+                                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                                    try {
+                                        final EventInfo info = new EventInfo();
+                                        info.event = this.readEvent(eventNode);
+                                        info.nodePath = nodePath;
+                                        try {
+                                            this.queue.put(info);
+                                        } catch (InterruptedException e) {
+                                            // we ignore this exception as this should never occur
+                                            this.ignoreException(e);
+                                        }
+                                    } catch (ClassNotFoundException cnfe) {
+                                        // store path for lazy loading
+                                        synchronized ( this.unloadedJobs ) {
+                                            this.unloadedJobs.add(nodePath);
+                                        }
+                                        this.ignoreException(cnfe);
+                                    }
+                                }
+                            }
+                        }
+                    } catch (RepositoryException re) {
+                        this.logger.error("Exception during jcr event processing.", re);
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to create a session.", re);
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+    }
+
+    /**
+     * Load all active jobs from the repository.
+     * @throws RepositoryException
+     */
+    private void loadJobs() {
+        try {
+            final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
+            final StringBuffer buffer = new StringBuffer("/jcr:root");
+            buffer.append(this.repositoryPath);
+            buffer.append("//element(*, ");
+            buffer.append(this.getEventNodeType());
+            buffer.append(") order by @");
+            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+            buffer.append(" ascending");
+            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+            final NodeIterator result = q.execute().getNodes();
+            while ( result.hasNext() ) {
+                final Node eventNode = result.nextNode();
+                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                    final String nodePath = eventNode.getPath();
+                    try {
+                        final Event event = this.readEvent(eventNode);
+                        final EventInfo info = new EventInfo();
+                        info.event = event;
+                        info.nodePath = nodePath;
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // we ignore this exception as this should never occur
+                            this.ignoreException(e);
+                        }
+                    } catch (ClassNotFoundException cnfe) {
+                        // store path for lazy loading
+                        synchronized ( this.unloadedJobs ) {
+                            this.unloadedJobs.add(nodePath);
+                        }
+                        this.ignoreException(cnfe);
+                    } catch (RepositoryException re) {
+                        this.logger.error("Unable to load stored job from " + nodePath, re);
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Exception during initial loading of stored jobs.", re);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event, java.lang.String)
+     */
+    public boolean sendAcknowledge(Event job, String eventNodePath) {
+        synchronized ( this.processingEventsList ) {
+            // if the event is still in the processing list, we confirm the ack
+            final Object ack = this.processingEventsList.remove(eventNodePath);
+            return ack != null;
+        }
+
+    }
+
+    /**
+     * This is a notification from the component which processed the job.
+     *
+     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
+     */
+    public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
+        // let's remove the event from our processing list
+        // this is just a sanity check, as usually the job should have been
+        // removed during sendAcknowledge.
+        synchronized ( this.processingEventsList ) {
+            this.processingEventsList.remove(eventNodePath);
+        }
+
+        boolean reschedule = shouldReschedule;
+        if ( shouldReschedule ) {
+            // check if we exceeded the number of retries
+            int retries = this.maxJobRetries;
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+                retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
+            }
+            int retryCount = 0;
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+            }
+            retryCount++;
+            if ( retries != -1 && retryCount > retries ) {
+                reschedule = false;
+            }
+            if ( reschedule ) {
+                // update event with retry count and retries
+                final Dictionary<String, Object> newProperties = new EventPropertiesMap(job);
+                newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+                newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
+                job = new Event(job.getTopic(), newProperties);
+            }
+        }
+        final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                        || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        EventInfo putback = null;
+        // we have to use the same session for unlocking that we used for locking!
+        synchronized ( this.backgroundLock ) {
+            // we might get here asnyc while this service has already been shutdown!
+            if ( this.backgroundSession == null ) {
+                // we can only return false here
+                return false;
+            }
+            try {
+                this.backgroundSession.refresh(false);
+                // check if the job has been cancelled
+                if ( !this.backgroundSession.itemExists(eventNodePath) ) {
+                    return true;
+                }
+                final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
+                boolean unlock = true;
+                try {
+                    if ( !reschedule ) {
+                        synchronized ( this.deletedJobs ) {
+                            this.deletedJobs.add(eventNodePath);
+                        }
+                        // unlock node
+                        try {
+                            eventNode.unlock();
+                        } catch (RepositoryException e) {
+                            // if unlock fails, we silently ignore this
+                            this.ignoreException(e);
+                        }
+                        unlock = false;
+                        final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                        if ( jobId == null ) {
+                            // remove node from repository if no job is set
+                            final Node parentNode = eventNode.getParent();
+                            eventNode.remove();
+                            parentNode.save();
+                        } else {
+                            eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
+                            eventNode.save();
+                        }
+                    }
+                } catch (RepositoryException re) {
+                    // if an exception occurs, we just log
+                    this.logger.error("Exception during job finishing.", re);
+                } finally {
+                    if ( !parallelProcessing) {
+                        final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                        synchronized ( this.processingMap ) {
+                            this.processingMap.put(jobTopic, Boolean.FALSE);
+                        }
+                    } else {
+                        if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
+                            this.parallelJobCount--;
+                        }
+                    }
+                    if ( unlock ) {
+                        synchronized ( this.deletedJobs ) {
+                            this.deletedJobs.add(eventNodePath);
+                        }
+                        // unlock node
+                        try {
+                            eventNode.unlock();
+                        } catch (RepositoryException e) {
+                            // if unlock fails, we silently ignore this
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+                if ( reschedule ) {
+                    // update retry count and retries in the repository
+                    try {
+                        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
+                        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
+                        eventNode.save();
+                    } catch (RepositoryException re) {
+                        // if an exception occurs, we just log
+                        this.logger.error("Exception during job updating job rescheduling information.", re);
+                    }
+                    final EventInfo info = new EventInfo();
+                    try {
+                        info.event = job;
+                        info.nodePath = eventNode.getPath();
+                    } catch (RepositoryException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                    // if this is an own job queue, we simply signal the queue to continue
+                    // it will pick up the event and either reschedule or wait
+                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                        // we know the queue exists
+                        final JobBlockingQueue jobQueue;
+                        synchronized ( this.jobQueues ) {
+                            jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+                        }
+                        synchronized ( jobQueue.getLock()) {
+                            jobQueue.notifyFinish(info);
+                        }
+                    } else {
+
+                        // delay rescheduling?
+                        if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                            putback = info;
+                        } else {
+                            // put directly into queue
+                            try {
+                                queue.put(info);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                this.ignoreException(e);
+                            }
+                        }
+                    }
+                } else {
+                    // if this is an own job queue, we simply signal the queue to continue
+                    // it will pick up the event and continue with the next event
+                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                        // we know the queue exists
+                        final JobBlockingQueue jobQueue;
+                        synchronized ( this.jobQueues ) {
+                            jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+                        }
+                        synchronized ( jobQueue.getLock()) {
+                            jobQueue.notifyFinish(null);
+                        }
+                    }
+                }
+            } catch (RepositoryException re) {
+                this.logger.error("Unable to create new session.", re);
+                return false;
+            }
+        }
+        if ( putback != null ) {
+            final EventInfo info = putback;
+            final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final Runnable t = new Runnable() {
+                public void run() {
+                    try {
+                        queue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        ignoreException(e);
+                    }
+                }
+            };
+            try {
+                this.scheduler.fireJobAt(null, t, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception and just put back the job in the queue
+                ignoreException(e);
+                t.run();
+            }
+        }
+        if ( !shouldReschedule ) {
+            return true;
+        }
+        return reschedule;
+    }
+
+    /**
+     * Search for job nodes
+     * @param topic The job topic
+     * @param filterProps optional filter props
+     * @param locked only active jobs?
+     * @return
+     * @throws RepositoryException
+     */
+    private Collection<Event> queryJobs(final String topic,
+                                        final Boolean locked,
+                                        final Map<String, Object>... filterProps)  {
+        // we create a new session
+        Session s = null;
+        final List<Event> jobs = new ArrayList<Event>();
+        try {
+            s = this.createSession();
+            final QueryManager qManager = s.getWorkspace().getQueryManager();
+            final StringBuffer buffer = new StringBuffer("/jcr:root");
+            buffer.append(this.repositoryPath);
+            if ( topic != null ) {
+                buffer.append('/');
+                buffer.append(topic.replace('/', '.'));
+            }
+            buffer.append("//element(*, ");
+            buffer.append(this.getEventNodeType());
+            buffer.append(") [not(@");
+            buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+            buffer.append(")");
+            if ( locked != null ) {
+                if ( locked ) {
+                    buffer.append(" and @jcr:lockOwner");
+                } else {
+                    buffer.append(" and not(@jcr:lockOwner)");
+                }
+            }
+            if ( filterProps != null && filterProps.length > 0 ) {
+                buffer.append(" and (");
+                int index = 0;
+                for (Map<String,Object> template : filterProps) {
+                    if ( index > 0 ) {
+                        buffer.append(" or ");
+                    }
+                    buffer.append('(');
+                    final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+                    boolean first = true;
+                    while ( i.hasNext() ) {
+                        final Map.Entry<String, Object> current = i.next();
+                        // check prop name first
+                        final String propName = EventUtil.getNodePropertyName(current.getKey());
+                        if ( propName != null ) {
+                            // check value
+                            final Value value = EventUtil.getNodePropertyValue(s.getValueFactory(), current.getValue());
+                            if ( value != null ) {
+                                if ( first ) {
+                                    first = false;
+                                    buffer.append('@');
+                                } else {
+                                    buffer.append(" and @");
+                                }
+                                buffer.append(propName);
+                                buffer.append(" = '");
+                                buffer.append(current.getValue());
+                                buffer.append("'");
+                            }
+                        }
+                    }
+                    buffer.append(')');
+                    index++;
+                }
+                buffer.append(')');
+            }
+            buffer.append("]");
+            buffer.append(" order by @");
+            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+            buffer.append(" ascending");
+            final String queryString = buffer.toString();
+            logger.debug("Executing job query {}.", queryString);
+
+            final Query q = qManager.createQuery(queryString, Query.XPATH);
+            final NodeIterator iter = q.execute().getNodes();
+            while ( iter.hasNext() ) {
+                final Node eventNode = iter.nextNode();
+                try {
+                    final Event event = this.readEvent(eventNode);
+                    jobs.add(event);
+                } catch (ClassNotFoundException cnfe) {
+                    // in the case of a class not found exception we just ignore the exception
+                    this.ignoreException(cnfe);
+                }
+            }
+        } catch (RepositoryException e) {
+            // in the case of an error, we return an empty list
+            this.ignoreException(e);
+        } finally {
+            if ( s != null) {
+                s.logout();
+            }
+        }
+        return jobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
+     */
+    public Collection<Event> getCurrentJobs(String topic) {
+        return this.getCurrentJobs(topic, (Map<String, Object>[])null);
+    }
+
+    /**
+     * This is deprecated.
+     */
+    public Collection<Event> scheduledJobs(String topic) {
+        return this.getScheduledJobs(topic);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String)
+     */
+    public Collection<Event> getScheduledJobs(String topic) {
+        return this.getScheduledJobs(topic, (Map<String, Object>[])null);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String, java.util.Map...)
+     */
+    public Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps) {
+        return this.queryJobs(topic, true, filterProps);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String, java.util.Map...)
+     */
+    public Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps) {
+        return this.queryJobs(topic, false, filterProps);
+    }
+
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getAllJobs(java.lang.String, java.util.Map...)
+     */
+    public Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps) {
+        return this.queryJobs(topic, null, filterProps);
+    }
+
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String, java.lang.String)
+     */
+    public void cancelJob(String topic, String jobId) {
+        if ( jobId != null && topic != null ) {
+            this.cancelJob(this.getNodePath(topic, jobId));
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String)
+     */
+    public void cancelJob(String jobId) {
+        if ( jobId != null ) {
+            synchronized ( this.writeLock ) {
+                try {
+                    this.writerSession.refresh(false);
+                } catch (RepositoryException e) {
+                    this.ignoreException(e);
+                }
+                try {
+                    if ( this.writerSession.itemExists(jobId) ) {
+                        final Item item = this.writerSession.getItem(jobId);
+                        final Node parentNode = item.getParent();
+                        item.remove();
+                        parentNode.save();
+                    }
+                } catch (RepositoryException e) {
+                    this.logger.error("Error during cancelling job at " + jobId, e);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+     */
+    public void wakeUpJobQueue(String jobQueueName) {
+        if ( jobQueueName != null ) {
+            synchronized ( this.jobQueues ) {
+                final JobBlockingQueue queue = this.jobQueues.get(jobQueueName);
+                if ( queue != null && queue.isSleeping() ) {
+                    final String schedulerJobName = queue.getSchedulerJobName();
+                    final Thread thread = queue.getSleepingThread();
+                    if ( schedulerJobName != null ) {
+                        this.scheduler.removeJob(schedulerJobName);
+                    }
+                    if ( thread != null ) {
+                        thread.interrupt();
+                    }
+                }
+            }
+        }
+    }
+
+
+    private static final class StartedJobInfo {
+        public final Event event;
+        public final String nodePath;
+        public final long  started;
+
+        public StartedJobInfo(final Event e, final String path, final long started) {
+            this.event = e;
+            this.nodePath = path;
+            this.started = started;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java b/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
new file mode 100644
index 0000000..fd54071
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import javax.jcr.Item;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.lock.Lock;
+import javax.jcr.lock.LockException;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.TimedEventStatusProvider;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for timed events.
+ *
+ * @scr.component metatype="no"
+ * @scr.service interface="TimedEventStatusProvider"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_TIMED_EVENT"
+ *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ *               values.started="org/osgi/framework/BundleEvent/STARTED"
+ * @scr.property name="repository.path" value="/var/eventing/timed-jobs"
+ */
+public class TimedJobHandler
+    extends AbstractRepositoryEventHandler
+    implements Job, TimedEventStatusProvider {
+
+    protected static final String JOB_TOPIC = "topic";
+
+    protected static final String JOB_CONFIG = "config";
+
+    protected static final String JOB_SCHEDULE_INFO = "info";
+
+    /** @scr.reference */
+    protected Scheduler scheduler;
+
+    /** Unloaded events. */
+    protected Set<String>unloadedEvents = new HashSet<String>();
+
+    /** Sync lock */
+    private final Object writeLock = new Object();
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+     */
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        // load timed events from repository
+        this.loadEvents();
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true);
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( this.running && event != null ) {
+                ScheduleInfo scheduleInfo = null;
+                try {
+                    scheduleInfo = new ScheduleInfo(event);
+                } catch (IllegalArgumentException iae) {
+                    this.logger.error(iae.getMessage());
+                }
+                if ( scheduleInfo != null ) {
+                    final EventInfo info = new EventInfo();
+                    info.event = event;
+
+                    // write event and update path
+                    // if something went wrong we get the node path and reschedule
+                    synchronized ( this.writeLock ) {
+                        info.nodePath = this.persistEvent(info.event, scheduleInfo);
+                    }
+                    if ( info.nodePath != null ) {
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen, so we ignore it
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
+    protected void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next info from the queue
+            EventInfo info = null;
+            try {
+                info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( info != null && this.running ) {
+                synchronized ( this.writeLock ) {
+                    ScheduleInfo scheduleInfo = null;
+                    try {
+                        scheduleInfo = new ScheduleInfo(info.event);
+                    } catch (IllegalArgumentException iae) {
+                        this.logger.error(iae.getMessage());
+                    }
+                    if ( scheduleInfo != null ) {
+                        try {
+                            this.writerSession.refresh(true);
+                            if ( this.writerSession.itemExists(info.nodePath) ) {
+                                final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+                                if ( !eventNode.isLocked() ) {
+                                    // lock node
+                                    Lock lock = null;
+                                    try {
+                                        lock = eventNode.lock(false, true);
+                                    } catch (RepositoryException re) {
+                                        // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                    }
+                                    if ( lock != null ) {
+                                        // if something went wrong, we reschedule
+                                        if ( !this.processEvent(info.event, scheduleInfo) ) {
+                                            try {
+                                                this.queue.put(info);
+                                            } catch (InterruptedException e) {
+                                                // this should never happen, so we ignore it
+                                                this.ignoreException(e);
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        try {
+            // get parent node
+            final Node parentNode = this.ensureRepositoryPath();
+            final String nodeName = scheduleInfo.jobId;
+            // is there already a node?
+            final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+            Lock lock = null;
+            if ( scheduleInfo.isStopEvent() ) {
+                // if this is a stop event, we should remove the node from the repository
+                // if there is no node someone else was faster and we can ignore this
+                if ( foundNode != null ) {
+                    try {
+                        foundNode.remove();
+                        parentNode.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                }
+                // stop the scheduler
+                processEvent(event, scheduleInfo);
+            } else {
+                // if there is already a node, it means we must handle an update
+                if ( foundNode != null ) {
+                    try {
+                        foundNode.remove();
+                        parentNode.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                    // create a stop event
+                    processEvent(event, scheduleInfo.getStopInfo());
+                }
+                // we only write the event if this is a local one
+                if ( EventUtil.isLocal(event) ) {
+
+                    // write event to repository, lock it and schedule the event
+                    final Node eventNode = writeEvent(event, nodeName);
+                    lock = eventNode.lock(false, true);
+                }
+            }
+
+            if ( lock != null ) {
+                // if something went wrong, we reschedule
+                if ( !this.processEvent(event, scheduleInfo) ) {
+                    final String path = lock.getNode().getPath();
+                    lock.getNode().unlock();
+                    return path;
+                }
+            }
+        } catch (RepositoryException re ) {
+            // something went wrong, so let's log it
+            this.logger.error("Exception during writing new job to repository.", re);
+        }
+        return null;
+    }
+
+    /**
+     * Process the event.
+     * If a scheduler is available, a job is scheduled or stopped.
+     * @param event The incomming event.
+     * @return
+     */
+    protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        final Scheduler localScheduler = this.scheduler;
+        if ( localScheduler != null ) {
+            // is this a stop event?
+            if ( scheduleInfo.isStopEvent() ) {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC) + "(" + scheduleInfo.jobId + ")");
+                }
+                try {
+                    localScheduler.removeJob(scheduleInfo.jobId);
+                } catch (NoSuchElementException nsee) {
+                    // this can happen if the job is scheduled on another node
+                    // so we can just ignore this
+                }
+                return true;
+            }
+            // we ignore remote job events
+            if ( !EventUtil.isLocal(event) ) {
+                return true;
+            }
+
+            // Create configuration for scheduled job
+            final Map<String, Serializable> config = new HashMap<String, Serializable>();
+            // copy properties
+            final Hashtable<String, Object> properties = new Hashtable<String, Object>();
+            config.put(JOB_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+            final String[] names = event.getPropertyNames();
+            if ( names != null ) {
+                for(int i=0; i<names.length; i++) {
+                    properties.put(names[i], event.getProperty(names[i]));
+                }
+            }
+            config.put(JOB_CONFIG, properties);
+            config.put(JOB_SCHEDULE_INFO, scheduleInfo);
+
+            try {
+                if ( scheduleInfo.expression != null ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with cron expression " + scheduleInfo.expression);
+                    }
+                    localScheduler.addJob(scheduleInfo.jobId, this, config, scheduleInfo.expression, false);
+                } else if ( scheduleInfo.period != null ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with period " + scheduleInfo.period);
+                    }
+                    localScheduler.addPeriodicJob(scheduleInfo.jobId, this, config, scheduleInfo.period, false);
+                } else {
+                    // then it must be date
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with date " + scheduleInfo.date);
+                    }
+                    localScheduler.fireJobAt(scheduleInfo.jobId, this, config, scheduleInfo.date);
+                }
+                return true;
+            } catch (Exception e) {
+                this.ignoreException(e);
+            }
+        } else {
+            this.logger.error("No scheduler available to start timed event " + event);
+        }
+        return false;
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(EventIterator iter) {
+        // we create an own session here
+        Session s = null;
+        try {
+            s = this.createSession();
+            while ( iter.hasNext() ) {
+                final javax.jcr.observation.Event event = iter.nextEvent();
+                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+                    || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+
+                    final String propPath = event.getPath();
+                    int pos = propPath.lastIndexOf('/');
+                    final String nodePath = propPath.substring(0, pos);
+                    final String propertyName = propPath.substring(pos+1);
+                    // we are only interested in unlocks
+                    if ( "jcr:lockOwner".equals(propertyName) ) {
+                        try {
+                            final Node eventNode = (Node) s.getItem(nodePath);
+                            if ( !eventNode.isLocked() ) {
+                                try {
+                                    final EventInfo info = new EventInfo();
+                                    info.event = this.readEvent(eventNode);
+                                    info.nodePath =nodePath;
+                                    try {
+                                        this.queue.put(info);
+                                    } catch (InterruptedException e) {
+                                        // we ignore this exception as this should never occur
+                                        this.ignoreException(e);
+                                    }
+                                } catch (ClassNotFoundException cnfe) {
+                                    // add it to the unloaded set
+                                    synchronized (unloadedEvents) {
+                                        this.unloadedEvents.add(nodePath);
+                                    }
+                                    this.ignoreException(cnfe);
+                                }
+                            }
+                        } catch (RepositoryException re) {
+                            this.logger.error("Exception during jcr event processing.", re);
+                        }
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to create a session.", re);
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(Event event) {
+        if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
+            // queue the event in order to respond quickly
+            try {
+                this.writeQueue.put(event);
+            } catch (InterruptedException e) {
+                // this should never happen
+                this.ignoreException(e);
+            }
+        } else {
+            // bundle event started or updated
+            boolean doIt = false;
+            synchronized ( this.unloadedEvents ) {
+                if ( this.unloadedEvents.size() > 0 ) {
+                    doIt = true;
+                }
+            }
+            if ( doIt ) {
+                final Runnable t = new Runnable() {
+
+                    public void run() {
+                        synchronized (unloadedEvents) {
+                            Session s = null;
+                            final Set<String> newUnloadedEvents = new HashSet<String>();
+                            newUnloadedEvents.addAll(unloadedEvents);
+                            try {
+                                s = createSession();
+                                for(String path : unloadedEvents ) {
+                                    newUnloadedEvents.remove(path);
+                                    try {
+                                        if ( s.itemExists(path) ) {
+                                            final Node eventNode = (Node) s.getItem(path);
+                                            if ( !eventNode.isLocked() ) {
+                                                try {
+                                                    final EventInfo info = new EventInfo();
+                                                    info.event = readEvent(eventNode);
+                                                    info.nodePath = path;
+                                                    try {
+                                                        queue.put(info);
+                                                    } catch (InterruptedException e) {
+                                                        // we ignore this exception as this should never occur
+                                                        ignoreException(e);
+                                                    }
+                                                } catch (ClassNotFoundException cnfe) {
+                                                    newUnloadedEvents.add(path);
+                                                    ignoreException(cnfe);
+                                                }
+                                            }
+                                        }
+                                    } catch (RepositoryException re) {
+                                        // we ignore this and readd
+                                        newUnloadedEvents.add(path);
+                                        ignoreException(re);
+                                    }
+                                }
+                            } catch (RepositoryException re) {
+                                // unable to create session, so we try it again next time
+                                ignoreException(re);
+                            } finally {
+                                if ( s != null ) {
+                                    s.logout();
+                                }
+                                unloadedEvents.clear();
+                                unloadedEvents.addAll(newUnloadedEvents);
+                            }
+                        }
+                    }
+
+                };
+                this.threadPool.execute(t);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+     */
+    public void execute(JobContext context) {
+        final String topic = (String) context.getConfiguration().get(JOB_TOPIC);
+        @SuppressWarnings("unchecked")
+        final Dictionary<Object, Object> properties = (Dictionary<Object, Object>) context.getConfiguration().get(JOB_CONFIG);
+        final EventAdmin ea = this.eventAdmin;
+        if ( ea != null ) {
+            try {
+                ea.postEvent(new Event(topic, properties));
+            } catch (IllegalArgumentException iae) {
+                this.logger.error("Scheduled event has illegal topic: " + topic, iae);
+            }
+        } else {
+            this.logger.warn("Unable to send timed event as no event admin service is available.");
+        }
+        final ScheduleInfo info = (ScheduleInfo) context.getConfiguration().get(JOB_SCHEDULE_INFO);
+        // is this job scheduled for a specific date?
+        if ( info.date != null ) {
+            // we can remove it from the repository
+            // we create an own session here
+            Session s = null;
+            try {
+                s = this.createSession();
+                if ( s.itemExists(this.repositoryPath) ) {
+                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                    final String nodeName = info.jobId;
+                    final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+                    if ( eventNode != null ) {
+                        try {
+                            eventNode.remove();
+                            parentNode.save();
+                        } catch (RepositoryException re) {
+                            // we ignore the exception if removing fails
+                            ignoreException(re);
+                        }
+                    }
+                }
+            } catch (RepositoryException re) {
+                this.logger.error("Unable to create a session.", re);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+
+    /**
+     * Load all active timed events from the repository.
+     * @throws RepositoryException
+     */
+    protected void loadEvents() {
+        try {
+            final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
+            final StringBuffer buffer = new StringBuffer("/jcr:root");
+            buffer.append(this.repositoryPath);
+            buffer.append("//element(*, ");
+            buffer.append(this.getEventNodeType());
+            buffer.append(")");
+            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+            final NodeIterator result = q.execute().getNodes();
+            while ( result.hasNext() ) {
+                final Node eventNode = result.nextNode();
+                if ( !eventNode.isLocked() ) {
+                    final String nodePath = eventNode.getPath();
+                    try {
+                        final Event event = this.readEvent(eventNode);
+                        final EventInfo info = new EventInfo();
+                        info.event = event;
+                        info.nodePath = nodePath;
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // we ignore this exception as this should never occur
+                            this.ignoreException(e);
+                        }
+                    } catch (ClassNotFoundException cnfe) {
+                        // add it to the unloaded set
+                        synchronized (unloadedEvents) {
+                            this.unloadedEvents.add(nodePath);
+                        }
+                        this.ignoreException(cnfe);
+                    } catch (RepositoryException re) {
+                        // if reading an event fails, we ignore this
+                        this.ignoreException(re);
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Exception during initial loading of stored timed events.", re);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+     */
+    protected void addNodeProperties(Node eventNode, Event event)
+    throws RepositoryException {
+        super.addNodeProperties(eventNode, event);
+        eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+        final ScheduleInfo info = new ScheduleInfo(event);
+        if ( info.date != null ) {
+            final Calendar c = Calendar.getInstance();
+            c.setTime(info.date);
+            eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_DATE, c);
+        }
+        if ( info.expression != null ) {
+            eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_EXPRESSION, info.expression);
+        }
+        if ( info.period != null ) {
+            eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue());
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+     */
+    protected String getEventNodeType() {
+        return EventHelper.TIMED_EVENT_NODE_TYPE;
+    }
+
+    protected static final class ScheduleInfo implements Serializable {
+
+        public final String expression;
+        public final Long   period;
+        public final Date   date;
+        public final String jobId;
+
+        public ScheduleInfo(final Event event)
+        throws IllegalArgumentException {
+            // let's see if a schedule information is specified or if the job should be stopped
+            this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
+            this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
+            this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+            int count = 0;
+            if ( this.expression != null) {
+                count++;
+            }
+            if ( this.period != null ) {
+                count++;
+            }
+            if ( this.date != null ) {
+                count++;
+            }
+            if ( count > 1 ) {
+                throw new IllegalArgumentException("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE +
+                                      ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
+                                      ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should be used.");
+            }
+            // we create a job id consisting of the real event topic and an (optional) id
+            // if the event contains a timed event id or a job id we'll append that to the name
+            String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            if ( topic == null ) {
+                throw new IllegalArgumentException("Timed event does not contain required property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            }
+            String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
+            String jId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+
+            //this.jobId = getJobId(topic, id, jId);
+            this.jobId = getJobId(topic, id, jId);
+        }
+
+        private ScheduleInfo(String jobId) {
+            this.expression = null;
+            this.period = null;
+            this.date = null;
+            this.jobId = jobId;
+        }
+
+        public ScheduleInfo getStopInfo() {
+            return new ScheduleInfo(this.jobId);
+        }
+
+        public boolean isStopEvent() {
+            return this.expression == null && this.period == null && this.date == null;
+        }
+
+        public static String getJobId(String topic, String timedEventId, String jobId) {
+            return topic.replace('/', '.') + "/TimedEvent " + (timedEventId != null ? EventHelper.filter(timedEventId) : "") + '_' + (jobId != null ? EventHelper.filter(jobId) : "");
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvent(java.lang.String, java.lang.String, java.lang.String)
+     */
+    public Event getScheduledEvent(String topic, String eventId, String jobId) {
+        Session s = null;
+        try {
+            s = this.createSession();
+            if ( s.itemExists(this.repositoryPath) ) {
+                final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                final String nodeName = ScheduleInfo.getJobId(topic, eventId, jobId);
+                final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+                if ( eventNode != null ) {
+                    return this.readEvent(eventNode);
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to create a session.", re);
+        } catch (ClassNotFoundException e) {
+            this.ignoreException(e);
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvents(java.lang.String, java.util.Map...)
+     */
+    public Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps) {
+        // we create a new session
+        Session s = null;
+        final List<Event> jobs = new ArrayList<Event>();
+        try {
+            s = this.createSession();
+            final QueryManager qManager = s.getWorkspace().getQueryManager();
+            final StringBuffer buffer = new StringBuffer("/jcr:root");
+            buffer.append(this.repositoryPath);
+            if ( topic != null ) {
+                buffer.append('/');
+                buffer.append(topic.replace('/', '.'));
+            }
+            buffer.append("//element(*, ");
+            buffer.append(this.getEventNodeType());
+            buffer.append(")");
+            if ( filterProps != null && filterProps.length > 0 ) {
+                buffer.append(" [");
+                int index = 0;
+                for (Map<String,Object> template : filterProps) {
+                    if ( index > 0 ) {
+                        buffer.append(" or ");
+                    }
+                    buffer.append('(');
+                    final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+                    boolean first = true;
+                    while ( i.hasNext() ) {
+                        final Map.Entry<String, Object> current = i.next();
+                        // check prop name first
+                        final String propName = EventUtil.getNodePropertyName(current.getKey());
+                        if ( propName != null ) {
+                            // check value
+                            final Value value = EventUtil.getNodePropertyValue(s.getValueFactory(), current.getValue());
+                            if ( value != null ) {
+                                if ( first ) {
+                                    first = false;
+                                    buffer.append('@');
+                                } else {
+                                    buffer.append(" and @");
+                                }
+                                buffer.append(propName);
+                                buffer.append(" = '");
+                                buffer.append(current.getValue());
+                                buffer.append("'");
+                            }
+                        }
+                    }
+                    buffer.append(')');
+                    index++;
+                }
+                buffer.append(']');
+            }
+            final String queryString = buffer.toString();
+            logger.debug("Executing job query {}.", queryString);
+
+            final Query q = qManager.createQuery(queryString, Query.XPATH);
+            final NodeIterator iter = q.execute().getNodes();
+            while ( iter.hasNext() ) {
+                final Node eventNode = iter.nextNode();
+                try {
+                    final Event event = this.readEvent(eventNode);
+                    jobs.add(event);
+                } catch (ClassNotFoundException cnfe) {
+                    // in the case of a class not found exception we just ignore the exception
+                    this.ignoreException(cnfe);
+                }
+            }
+        } catch (RepositoryException e) {
+            // in the case of an error, we return an empty list
+            this.ignoreException(e);
+        } finally {
+            if ( s != null) {
+                s.logout();
+            }
+        }
+        return jobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#cancelTimedEvent(java.lang.String)
+     */
+    public void cancelTimedEvent(String jobId) {
+        synchronized ( this.writeLock ) {
+            try {
+                // is there a node?
+                final Item foundNode = this.writerSession.itemExists(jobId) ? this.writerSession.getItem(jobId) : null;
+                // we should remove the node from the repository
+                // if there is no node someone else was faster and we can ignore this
+                if ( foundNode != null ) {
+                    final Node parentNode = foundNode.getParent();
+                    try {
+                        foundNode.remove();
+                        parentNode.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                }
+            } catch ( RepositoryException re) {
+                this.logger.error("Unable to cancel timed event: " + jobId, re);
+            }
+            // stop the scheduler
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Stopping timed event " + jobId);
+            }
+            final Scheduler localScheduler = this.scheduler;
+            if ( localScheduler != null ) {
+                try {
+                    localScheduler.removeJob(jobId);
+                } catch (NoSuchElementException nsee) {
+                    // this can happen if the job is scheduled on another node
+                    // so we can just ignore this
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/resources/META-INF/DISCLAIMER b/src/main/resources/META-INF/DISCLAIMER
new file mode 100644
index 0000000..90850c2
--- /dev/null
+++ b/src/main/resources/META-INF/DISCLAIMER
@@ -0,0 +1,7 @@
+Apache Sling is an effort undergoing incubation at The Apache Software Foundation (ASF),
+sponsored by the Apache Jackrabbit PMC. Incubation is required of all newly accepted
+projects until a further review indicates that the infrastructure, communications,
+and decision making process have stabilized in a manner consistent with other
+successful ASF projects. While incubation status is not necessarily a reflection of
+the completeness or stability of the code, it does indicate that the project has yet
+to be fully endorsed by the ASF.
\ No newline at end of file
diff --git a/src/main/resources/META-INF/LICENSE b/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/src/main/resources/META-INF/NOTICE b/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..32a792a
--- /dev/null
+++ b/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,8 @@
+Apache Sling Event
+Copyright 2008-2009 The Apache Software Foundation
+
+Apache Sling is based on source code originally developed 
+by Day Software (http://www.day.com/).
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/src/main/resources/OSGI-INF/metatype/metatype.properties b/src/main/resources/OSGI-INF/metatype/metatype.properties
new file mode 100644
index 0000000..d3a14ad
--- /dev/null
+++ b/src/main/resources/OSGI-INF/metatype/metatype.properties
@@ -0,0 +1,111 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+#
+
+#
+# This file contains localization strings for configuration labels and
+# descriptions as used in the metatype.xml descriptor generated by the
+# the SCR plugin
+
+#
+# Distributing Event Handler
+dist.events.name = Apache Sling Distributing Event Handler 
+dist.events.description = Distributes local OSGi Event Admin events to \
+ other nodes of the same cluster. The events are written to the JCR \
+ repository for distribution to other nodes while events written to the \
+ repository are picked up and distributed locally through the OSGi Event Admin \
+ Service.   
+
+#
+# Job Event Handler
+job.events.name = Apache Sling Job Event Handler 
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the repository and distributed \
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
+
+sleep.time.name = Retry Interval
+sleep.time.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+ 
+max.job.retries.name = Maximum Retries
+max.job.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+jobscheduler.period.name = Event Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to \
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+
+wait.for.ack.name = Acknowledge Waiting Time
+wait.for.ack.description = If a service is processing a job, it acknowledges this \
+ by sending a message to the Job Event Handler. If the Job Event Handler does not \
+ receive such a message in the configured time, it reschedules the job. The configured \
+ time is in seconds (default is 90 secs).
+
+max.parallel.jobs.name = Maximum Parallel Jobs
+max.parallel.jobs.description = The maximum number of parallel jobs started for the main \
+ queue.
+
+
+#
+# Event Pool
+event.pool.name = Apache Sling Event Thread Pool 
+event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+
+minPoolSize.name = Min Pool Size
+minPoolSize.description = The minimum pool size. The minimum pool size should be \
+ higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
+ allows to process 10 events in parallel.
+
+maxPoolSize.name = Max Pool Size
+maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
+ the minimum pool size.
+
+queueSize.name = Queue Size
+queueSize.description = The maximum size of the thread queue if the pool is exhausted.
+
+#
+# Shared labels
+scheduler.period.name = Event Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+ 
+repository.path.name = Persistent Event Location
+repository.path.description = Absolute Path of the Repository location where \
+ events are persisted to be picked up by the event distribution mechanism. \
+ The default value is "/sling/events".
diff --git a/src/main/resources/SLING-INF/nodetypes/event.cnd b/src/main/resources/SLING-INF/nodetypes/event.cnd
new file mode 100644
index 0000000..52ec213
--- /dev/null
+++ b/src/main/resources/SLING-INF/nodetypes/event.cnd
@@ -0,0 +1,42 @@
+//

+//  Licensed to the Apache Software Foundation (ASF) under one

+//  or more contributor license agreements.  See the NOTICE file

+//  distributed with this work for additional information

+//  regarding copyright ownership.  The ASF licenses this file

+//  to you under the Apache License, Version 2.0 (the

+//  "License"); you may not use this file except in compliance

+//  with the License.  You may obtain a copy of the License at

+//

+//   http://www.apache.org/licenses/LICENSE-2.0

+//

+//  Unless required by applicable law or agreed to in writing,

+//  software distributed under the License is distributed on an

+//  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+//  KIND, either express or implied.  See the License for the

+//  specific language governing permissions and limitations

+//  under the License.

+//

+

+<slingevent='http://sling.apache.org/jcr/event/1.0'>

+<nt='http://www.jcp.org/jcr/nt/1.0'>

+<mix='http://www.jcp.org/jcr/mix/1.0'>

+

+[slingevent:Event] > nt:unstructured, nt:hierarchyNode

+  - slingevent:topic (string)

+  - slingevent:application (string)

+  - slingevent:created (date)

+  - slingevent:properties (binary)

+  

+[slingevent:Job] > slingevent:Event, mix:lockable

+  - slingevent:processor (string)

+  - slingevent:id (string)

+  - slingevent:finished (date)

+ 

+[slingevent:TimedEvent] > slingevent:Event, mix:lockable

+  - slingevent:processor (string)

+  - slingevent:id (string)

+  - slingevent:expression (string)

+  - slingevent:date (date)

+  - slingevent:period (long)

+

+  

diff --git a/src/test/java/org/apache/sling/event/EventUtilTest.java b/src/test/java/org/apache/sling/event/EventUtilTest.java
new file mode 100644
index 0000000..b55e229
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/EventUtilTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.InputStream;
+import java.util.Calendar;
+import java.util.Properties;
+
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+import javax.jcr.ValueFormatException;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+/**
+ * Tests for the EventUtil utility methods.
+ */
+@RunWith(JMock.class)
+public class EventUtilTest {
+
+    protected Mockery context;
+
+    public EventUtilTest() {
+        this.context = new JUnit4Mockery();
+    }
+
+    @Test public void testDistributeFlag() {
+        final Event distributableEvent = EventUtil.createDistributableEvent("some/topic", null);
+        assertTrue(EventUtil.shouldDistribute(distributableEvent));
+        final Event nonDistributableEvent = new Event("another/topic", null);
+        assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+    }
+
+    @Test public void testLocalFlag() {
+        final Event localEvent = new Event("local/event", null);
+        assertTrue(EventUtil.isLocal(localEvent));
+        final Properties props = new Properties();
+        props.put(EventUtil.PROPERTY_APPLICATION, "application1");
+        final Event remoteEvent = new Event("remote/event", props);
+        assertFalse(EventUtil.isLocal(remoteEvent));
+    }
+
+    @Test public void testGetNodePropertyValue() {
+        final ValueFactory factory = this.context.mock(ValueFactory.class);
+        this.context.checking(new Expectations() {{
+            allowing(factory).createValue(true);
+            will(returnValue(new ValueImpl(PropertyType.BOOLEAN)));
+            allowing(factory).createValue(false);
+            will(returnValue(new ValueImpl(PropertyType.BOOLEAN)));
+            allowing(factory).createValue(with(any(Long.class)));
+            will(returnValue(new ValueImpl(PropertyType.LONG)));
+            allowing(factory).createValue(with(any(String.class)));
+            will(returnValue(new ValueImpl(PropertyType.STRING)));
+            allowing(factory).createValue(with(any(Calendar.class)));
+            will(returnValue(new ValueImpl(PropertyType.DATE)));
+        }});
+        // boolean
+        assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, true).getType());
+        assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, false).getType());
+        assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, Boolean.TRUE).getType());
+        assertEquals(PropertyType.BOOLEAN, EventUtil.getNodePropertyValue(factory, Boolean.FALSE).getType());
+        // long
+        assertEquals(PropertyType.LONG, EventUtil.getNodePropertyValue(factory, (long)5).getType());
+        // int = not possible
+        assertEquals(null, EventUtil.getNodePropertyValue(factory, 5));
+        // string
+        assertEquals(PropertyType.STRING, EventUtil.getNodePropertyValue(factory, "something").getType());
+        // calendar
+        assertEquals(PropertyType.DATE, EventUtil.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+    }
+
+    private final static class ValueImpl implements Value {
+
+        private final int type;
+
+        public ValueImpl(int type) {
+            this.type = type;
+        }
+
+        public boolean getBoolean() throws ValueFormatException,
+                IllegalStateException, RepositoryException {
+            return false;
+        }
+
+        public Calendar getDate() throws ValueFormatException,
+                IllegalStateException, RepositoryException {
+            return null;
+        }
+
+        public double getDouble() throws ValueFormatException,
+                IllegalStateException, RepositoryException {
+            return 0;
+        }
+
+        public long getLong() throws ValueFormatException,
+                IllegalStateException, RepositoryException {
+            return 0;
+        }
+
+        public InputStream getStream() throws IllegalStateException,
+                RepositoryException {
+            return null;
+        }
+
+        public String getString() throws ValueFormatException,
+                IllegalStateException, RepositoryException {
+            return null;
+        }
+
+        public int getType() {
+            return this.type;
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java b/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
new file mode 100644
index 0000000..fb0d801
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventListenerIterator;
+
+import org.apache.sling.commons.testing.jcr.RepositoryUtil;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.engine.SlingSettingsService;
+import org.apache.sling.event.ThreadPool;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractRepositoryEventHandlerTest {
+
+    protected AbstractRepositoryEventHandler handler;
+
+    protected static final String REPO_PATH = "/test/events";
+    protected static final String SLING_ID = "4711";
+
+    protected static Session session;
+
+    protected abstract Mockery getMockery();
+
+    protected Dictionary<String, Object> getComponentConfig() {
+        final Dictionary<String, Object> config = new Hashtable<String, Object>();
+        config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
+
+        return config;
+    }
+
+    @org.junit.BeforeClass public static void setupRepository() throws Exception {
+        RepositoryUtil.startRepository();
+        final SlingRepository repository = RepositoryUtil.getRepository();
+        session = repository.loginAdministrative(repository.getDefaultWorkspace());
+        assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+        assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+    }
+
+    @org.junit.AfterClass public static void shutdownRepository() throws Exception {
+        RepositoryUtil.stopRepository();
+    }
+
+    @org.junit.Before public void setup() throws Exception {
+        this.handler.repository = RepositoryUtil.getRepository();
+
+        // the event admin
+        final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
+        this.handler.eventAdmin = eventAdmin;
+        this.getMockery().checking(new Expectations() {{
+            allowing(eventAdmin).postEvent(with(any(Event.class)));
+            allowing(eventAdmin).sendEvent(with(any(Event.class)));
+        }});
+
+        // sling settings service
+        this.handler.settingsService = new SlingSettingsService() {
+            public String getSlingId() {
+                return SLING_ID;
+            }
+        };
+
+        // we need a thread pool
+        this.handler.threadPool = new ThreadPoolImpl();
+
+        // lets set up the bundle context
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
+
+        // lets set up the component configuration
+        final Dictionary<String, Object> componentConfig = this.getComponentConfig();
+
+        // lets set up the compnent context
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class);
+        this.getMockery().checking(new Expectations() {{
+            allowing(componentContext).getBundleContext();
+            will(returnValue(bundleContext));
+            allowing(componentContext).getProperties();
+            will(returnValue(componentConfig));
+        }});
+
+        this.handler.activate(componentContext);
+        // the session is initialized in the background, so let's sleep some seconds
+        Thread.sleep(2 * 1000);
+    }
+
+    @org.junit.After public void shutdown() throws Exception {
+        // delete all child nodes to get a clean repository again
+        final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+        final NodeIterator iter = rootNode.getNodes();
+        while ( iter.hasNext() ) {
+            final Node child = iter.nextNode();
+            child.remove();
+        }
+        rootNode.save();
+        // lets set up the bundle context with the sling id
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
+
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class);
+        this.getMockery().checking(new Expectations() {{
+            allowing(componentContext).getBundleContext();
+            will(returnValue(bundleContext));
+        }});
+        this.handler.deactivate(componentContext);
+    }
+
+    @org.junit.Test public void testSetup() throws RepositoryException {
+        assertEquals(this.handler.applicationId, SLING_ID);
+        assertEquals(this.handler.repositoryPath, REPO_PATH);
+        assertNotNull(this.handler.writerSession);
+        final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+        boolean found = false;
+        while ( !found && iter.hasNext() ) {
+            final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+            found = (listener == this.handler);
+        }
+        assertTrue("Handler is not registered as event listener.", found);
+    }
+
+    @org.junit.Test public void testPathCreation() throws RepositoryException {
+        assertTrue(session.itemExists(REPO_PATH));
+    }
+
+    final class ThreadPoolImpl implements ThreadPool {
+
+        public void execute(Runnable runnable) {
+            final Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        public String getName() {
+            return EventHelper.THREAD_POOL_NAME;
+        }
+
+        public void shutdown() {
+            // nothing to do
+        }
+
+        public ThreadPoolConfig getConfiguration() {
+            return new ThreadPoolConfig();
+        }
+
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java b/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
new file mode 100644
index 0000000..7ad96d4
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+@RunWith(JMock.class)
+public class DistributingEventHandlerTest extends AbstractRepositoryEventHandlerTest {
+
+    protected Mockery context;
+
+    public DistributingEventHandlerTest() {
+        this.handler = new DistributingEventHandler();
+        this.context = new JUnit4Mockery();
+    }
+
+    @Override
+    protected Mockery getMockery() {
+        return this.context;
+    }
+
+    @org.junit.Test public void testWriteEvent() throws Exception {
+        final String topic = "write/event/test";
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a property", "some value");
+        final Event e = new Event(topic, props);
+        this.handler.writeEvent(e, null);
+
+        final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+        final NodeIterator iter = rootNode.getNodes();
+        iter.hasNext();
+        final Node eventNode = iter.nextNode();
+        assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
+        assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+        // as a starting point we just check if the properties property exists
+        assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
+    }
+
+    @org.junit.Test public void testWriteEventPlusAppId() throws Exception {
+        final String topic = "write/event/test";
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a property", "some value");
+        // now we check if the application id is handled correctly
+        props.put(EventUtil.PROPERTY_APPLICATION, "foo");
+        this.handler.writeEvent(new Event(topic, props), null);
+        final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+        final NodeIterator iter = rootNode.getNodes();
+        iter.hasNext();
+        final Node eventNode = iter.nextNode();
+        assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
+        assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
+        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+        // as a starting point we just check if the properties property exists
+        assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
+    }
+}