SLING-5646 MoM API and JMS implementation with example usage by Jobs implementation.
Squashes 27 commits from https://github.com/ieb/sling/tree/jobs_28 as follows.
Added first stab at a message oriented job subsystem
Added basic implementation for the manager keeping queue implementation details abstracted
Added ActiveMQ implementation for queues and topics, and fixed a number of the the SPI interfaces in the process
Basic Test coverate for OOT JMS Broker
Extracted a MOM API with no Jobs or JMS references, test coverage for ActiveMQ impl is 100% class, 93% method, 75% line
Added missing license headers, documentation and cleaned out unused  interfaces
Fixed JMS Transaction issue found by @tmaret
Coverage for the majority of the jobs code is complete
Basic unit test coverage complete, core has 94% by lines, AMQ 74%, 100% classes and methods
Added testing environment for a runnign server, Not working yet
Added ability for detect when the OSGi container has completed bundle startup without having to perform http requests
Start at IT testing with Crankstart
Fixed issues with shutdown inside a Crankstart container
Working Crankstart IT framework
Version that uses Q->Jobs->JobConsumer pattern
Added a Queue Factory to allow configuration of multiple queues between the MOM API and Job Subsystem and move JobConsumers to register with a Job type
Migrated Subscribers and QueueReaders to a OSGi whiteboard pattern after discussion on Sling Dev
Changes the JobConsumer to use a Callback rather than return a job. This was suggested offlist by others in Adobe as a way of improving resource consumption
Added Types to improve type safety in certain areas after suggestions offlist
Fixed issue with OSGi startup in IntelliJ caused by version 4 of the
Felix framework bundle being present inside the maven pom. Strangely a
command line build was not impacted.
Added integration test bundle to test service. Adjusted some of the APIs to make using the Job Sub System easier
Integration tests now starting jobs from messages
Fixed Startup to work in real Sling/AEM container. The Active MQ OSGi bundle contains additional dependencies that cause all sorts of problems, AMQ is now being embedded into the AMQ MOM Impl bundle.
Fixed Queue expriy bug and added AEM Fiddle to run jobs
Added Documentation for configuration and default sample configuration
Added Explicit requeue mechanims rather than relying on AMQ's requeue capabilities
Moved MoM to new Home

git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1754255 13f79535-47bb-0310-9956-ffa450edef68
16 files changed
tree: 5d5a0bb25633df8b835b8eb9aa0f39af5dcbbf9d
  1. src/
  2. .gitignore
  3. pom.xml
  4. README.md
README.md

Message Oriented Middleware API implementation using Active MQ.

This bundle implements the MoM API using Active MQ. It supports both Pub/Sub and Queue patterns and will run out of the box, embedded or connected to a dedicated cluster.

Out of the Box.

As the name suggests, no configuration is required, the components will start, create an ActiveMQ instance embedded inside the OSGi container and run. On restart, normal or after a crash, provided the working directory is not changed or modified, the ActiveMQ embedded server will restart and resume operations. In the event the JVM crashes, ActiveMQ will perform recovery by replaying its journal. The embedded server uses KahaDB to store data on local disk.

Embedded with custom configuration.

AMQ can be run embedded with custom configuration to allow a cluster of Sling instances for form a multi master AMQ cluster with each Sling instance embedding its own AMQ broker. This is achieved via OSGi configuration. (ToDo: Config + Doc)

External AMQ Broker cluster.

The bundle can be run to use an external AMQ Broker cluster, maintained and setup separately from the Sling cluster. To do this, modify the Broker URL via OSGi configuration.

Implementation details.

AMQ ConnectionFactory.

Running AMQ inside OSGi is very simple. All that is required is the AMQ dependencies, and instancing an AQM PooledConnectionFactory with a default localhost url. vm://localhost:61616. The PooledConnectionFactory will trigger the creation of an AMQ Broker if one is not present and AMQ will run normally. This service implements an internal service API org.apache.sling.jms.ConnectionFactoryService, which enables consumers to get a JMS ConnectionFactory.

MoM API implementation

The MoM API implementation uses the ConnectionFactory service to get a JMS connection. It opens JMS sessions using that connection factory and implements the methods in the API. JMS Support both Pub/Sub and Queue patterns in the MoM API without much additional work. The JMS sessions are single threaded, so care is taken not to share between threads or cause throughput issues with synchronization.

The Map of Map messages in the MoM API are serialised to Json using the Gson library and transmitted as Text messages. JMS Headers are currently not used other than to identify the JSON encoding of the text payload.

The delivery of messages on Topics and Queues is entirely managed by AMQ with no additional code. The retry semantics of the QueueReader API is achieved by dispatching JMS messages from within a JMS MessageListener onMessage method, and throwing an IllegalArgumentException to JMS when a message needs to be re-queued. How retries work and the backoff algorithm used for messages that need to be retried is managed by ActiveMQ configuration which supports many scenarios for retrying messages.

Delivery Retry for Queued messages

The semantics of the MoM API is that a consumer may throw an exception when its QueueReader.onMessage method is called. That indicates that the message could not be consumed at this time and should be retried. There are several ways that this can be achieved in general, and some AMQ specific ways. By default JMS ensures delivery order. Hence a message on the queue that is not dequeued, will block other messages on the queue until it is dequeued. AMQ deals with this by allowing a deployyer to configure queues to retry at the broker rather than attempting to redeliver in order to the same JMS consumer. The configuration is not default and has to be provided by configuring the AMQ broker.

<broker xmlns="http://activemq.apache.org/schema/core"    schedulerSupport="true" >
    .... 
    <plugins>
        <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
            <redeliveryPolicyMap>
                <redeliveryPolicyMap>
                    <redeliveryPolicyEntries>
                        <!-- a destination specific policy -->
                        <redeliveryPolicy queue="SpecialQueue" maximumRedeliveries="4" redeliveryDelay="10000" />
                    </redeliveryPolicyEntries>
                    <!-- the fallback policy for all other destinations -->
                    <defaultEntry>
                        <redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000" redeliveryDelay="10000" />
                    </defaultEntry>
                </redeliveryPolicyMap>
            </redeliveryPolicyMap>
        </redeliveryPlugin>
    </plugins>

This can also be achieved in code, by dequeing all messages regardless of failiure or not. Those generate an exception on dequeued get requeued. If the size of the queue is so large as to significantly impact processing due to delays in processing the queue, then an alternative approach is to requeue to a special retry queue, ensuring that retries get a higher level of priority. This may not be necessary, as retries happen due to unavailability, and if the queue is long, then resources will be available, so no retry. If the queue is short, then the re-queue time is minimal. The approach is quite simular to the approach used in AMQ 5.7 and later, although it will work with any JMS provider.

The code base is currently configured to use and explicit dequeue and requeue approach that does not depend on features of the JMS provider.