blob: b3f9be9864bb74048d57bc4575edf8db1f736a91 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= JMS Streamer
== Overview
Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite.
This data streamer supports the following features:
* Consumes from queues or topics.
* Supports durable subscriptions from topics.
* Concurrent consumers are supported via the `threads` parameter.
** When consuming from queues, this component will start as many `Session` objects with separate `MessageListener` instances each, therefore achieving _natural_ concurrency.
** When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a _virtualized_ manner through an internal thread pool.
* Transacted sessions are supported through the `transacted` parameter.
* Batched consumption is possible via the `batched` parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction).
** Batches are committed when the `batchClosureMillis` time has elapsed, or when a Session has received at least `batchClosureSize` messages.
** Time-based closure fires with the specified frequency and applies to all ``Session``s in parallel.
** Size-based closure applies to each individual `Session` (as transactions are `Session-bound` in JMS), so it will fire when that `Session` has processed that many messages.
** Both options are compatible with each other. You can disable either, but not both if batching is enabled.
* Supports specifying the destination with implementation-specific `Destination` objects or with names.
We have tested our implementation against http://activemq.apache.org[Apache ActiveMQ, window=_blank], but any JMS broker
is supported as long as it client library implements the http://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/[JMS 1.1 specification, window=_blank].
== Instantiating JMS Streamer
When you instantiate the JMS Streamer, you will need to concretize the following generic types:
* `T extends Message` \=> the type of JMS `Message` this streamer will receive. If it can receive multiple, use the generic `Message` type.
* `K` \=> the type of the cache key.
* `V` \=> the type of the cache value.
To configure the JMS streamer, you will need to provide the following compulsory properties:
* `connectionFactory` \=> an instance of your `ConnectionFactory` duly configured as required by the broker. It can be a pooled `ConnectionFactory`.
* `destination` or (`destinationName` and `destinationType`) \=> a `Destination` object (normally a broker-specific implementation of the JMS `Queue` or `Topic` interfaces), or the combination of a destination name (queue or topic name) and the type as a `Class` reference to either `Queue` or `Topic`. In the latter case, the streamer will use either `Session.createQueue(String)` or `Session.createTopic(String)` to get a hold of the destination.
* `transformer` \=> an implementation of `MessageTransformer<T, K, V>` that digests a JMS message of type `T` and produces a `Map<K, V>` of cache entries to add. It can also return `null` or an empty `Map` to ignore the incoming message.
== Example
The example in this section populates a cache with `String` keys and `String` values, consuming `TextMessages` with this format:
----
raulk,Raul Kripalani
dsetrakyan,Dmitriy Setrakyan
sv,Sergi Vladykin
gm,Gianfranco Murador
----
Here is the code:
[tabs]
--
tab:Java[]
[source,java]
----
// create a data streamer
IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
dataStreamer.allowOverwrite(true);
// create a JMS streamer and plug the data streamer into it
JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connectionFactory);
jmsStreamer.setDestination(destination);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
@Override
public Map<String, String> apply(TextMessage message) {
final Map<String, String> answer = new HashMap<>();
String text;
try {
text = message.getText();
}
catch (JMSException e) {
LOG.warn("Could not parse message.", e);
return Collections.emptyMap();
}
for (String s : text.split("\n")) {
String[] tokens = s.split(",");
answer.put(tokens[0], tokens[1]);
}
return answer;
}
});
jmsStreamer.start();
// on application shutdown
jmsStreamer.stop();
dataStreamer.close();
----
--
To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.):
[tabs]
--
tab:pom.xml[]
[source,xml]
----
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-jms11-ext</artifactId>
<version>${ignite-jms11-ext.version}</version>
</dependency>
----
--