<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<!DOCTYPE html>

      
      
<html lang="en">
<head>
    <!-- Global site tag (gtag.js) - Google Analytics -->
    <script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script>
    <script>
    window.dataLayer = window.dataLayer || [];
    function gtag(){dataLayer.push(arguments);}
    gtag('js', new Date());

    gtag('config', 'UA-61232409-1');
    </script>

    

    <meta charset="UTF-8">
    <title>JMS Streamer | Ignite Documentation</title>
    
    <link rel="canonical" href="/docs/jms/jms-streamer" />
    
	
	<link rel="stylesheet" href="/assets/css/styles.css?1651672546">
    <link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
    <link rel="shortcut icon" href="/favicon.ico">
    <meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>

	<link rel="stylesheet"
	  href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">

    <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
    <script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1651672546"></script>
    <script type="text/javascript" src="/assets/js/anchor.min.js?1651672546"></script>
    

</head>
<body>
    <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<header>
    <div class="container">
        <button type='button' class='menu' title='Docs menu'>
          <img src="/assets/images/menu-icon.svg"/>
        </button>

        <div class='home'>
            <a href="/" class='home' title='Apache Ignite home'>
                <img src="/assets/images/apache_ignite_logo.svg" alt="Apache Ignite logo" width="103" height="36" >
            </a>
        </div>

        <nav>

        </nav>
        <form class='search'>
            <button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
            <input type="search" placeholder="Search…" id="search-input">
        </form>
        <button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
        <button type='button' class='top-nav-toggle'>⋮</button>
        <a href="https://github.com/ignite" title='GitHub' class='github' target="_blank">
            <img src="/assets/images/github-gray.svg" alt="GitHub logo">
        </a>
    </div>
</header>


    <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
	 <link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
    <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->







<nav class='left-nav' data-swiftype-index='false'>

    
    <li>
        

<a href="/docs/extensions/aws/aws" class='' >Amazon S3 IP Finder</a>

</li>

    <li>
        

<a href="/docs/extensions/camel/camel-streamer" class='' >Apache Camel Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/flink/flink-streamer" class='' >Apache Flink Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/flume/flume-sink" class='' >Apache Flume Sink</a>

</li>

    <li>
        

<a href="/docs/extensions/azure/azure" class='' >Apache Ignite Azure Module</a>

</li>

    <li>
        

<a href="/docs/extensions/gce/gce" class='' >Apache Ignite GCE Module</a>

</li>

    <li>
        

<a href="/docs/extensions/pub-sub/pub-sub" class='' >Apache Ignite Pub/Sub Module</a>

</li>

    <li>
        

<a href="/docs/extensions/spring/spring-boot" class='' >Apache Ignite and Spring Boot</a>

</li>

    <li>
        

<a href="/docs/extensions/spring/spring-data" class='' >Apache Ignite and Spring Data</a>

</li>

    <li>
        

<a href="/docs/extensions/spring/spring-caching" class='' >Apache Ignite and Spring Cache</a>

</li>

    <li>
        

<a href="/docs/extensions/spring/spring-tx" class='' >Apache Ignite and Spring Transactions</a>

</li>

    <li>
        

<a href="/docs/extensions/kafka/kafka-streamer" class='' >Apache Kafka Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/storm/storm-streamer" class='' >Apache Storm Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/cdc/change-data-capture-extensions" class='' >Change Data Capture Extension</a>

</li>

    <li>
        

<a href="/docs/extensions/jms/jms-streamer" class='' >JMS Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/mqtt/mqtt-streamer" class='' >MQTT Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/perf-statistics/performance-statistics" class='' >Performance Statistics Extension</a>

</li>

    <li>
        

<a href="/docs/extensions/rocketmq/rocketmq-streamer" class='' >RocketMQ Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/topology-validator/topology-validator" class='' >Topology Validator</a>

</li>

    <li>
        

<a href="/docs/extensions/twitter/twitter-streamer" class='' >Twitter Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/zeromq/zeromq-streamer" class='' >ZeroMQ Streamer</a>

</li>

    <li>
        

<a href="/docs/extensions/zookeeper/zookeeper-ip" class='' >ZooKeeper IP Finder</a>

</li>

</nav>
<div class="left-nav__overlay"></div>


    <article data-swiftype-index='true'>
        <a class='edit-link' href="/_docs/jms/jms-streamer.adoc" target="_blank">Edit</a>
        
            <h1>JMS Streamer</h1>
        
        <div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p>Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite.</p>
</div>
<div class="paragraph">
<p>This data streamer supports the following features:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Consumes from queues or topics.</p>
</li>
<li>
<p>Supports durable subscriptions from topics.</p>
</li>
<li>
<p>Concurrent consumers are supported via the <code>threads</code> parameter.</p>
<div class="ulist">
<ul>
<li>
<p>When consuming from queues, this component will start as many <code>Session</code> objects with separate <code>MessageListener</code> instances each, therefore achieving <em>natural</em> concurrency.</p>
</li>
<li>
<p>When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a <em>virtualized</em> manner through an internal thread pool.</p>
</li>
</ul>
</div>
</li>
<li>
<p>Transacted sessions are supported through the <code>transacted</code> parameter.</p>
</li>
<li>
<p>Batched consumption is possible via the <code>batched</code> parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment​ round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction).</p>
<div class="ulist">
<ul>
<li>
<p>Batches are committed when the <code>batchClosureMillis</code> time has elapsed, or when a Session has received at least <code>batchClosureSize</code> messages.</p>
</li>
<li>
<p>Time-based closure fires with the specified frequency and applies to all <code>Session</code>s in parallel.</p>
</li>
<li>
<p>Size-based closure applies to each individual <code>Session</code> (as transactions are <code>Session-bound</code> in JMS), so it will fire when that <code>Session</code> has processed that many messages.</p>
</li>
<li>
<p>Both options are compatible with each other. You can disable either, but not both if batching is enabled.</p>
</li>
</ul>
</div>
</li>
<li>
<p>Supports specifying the destination with implementation-specific <code>Destination</code> objects or with names.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>We have tested our implementation against <a href="http://activemq.apache.org" target="_blank" rel="noopener">Apache ActiveMQ</a>, but any JMS broker
is supported as long as it client library implements the <a href="http://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/" target="_blank" rel="noopener">JMS 1.1 specification</a>.</p>
</div>
</div>
</div>
<div class="sect1">
<h2 id="instantiating-jms-streamer">Instantiating JMS Streamer</h2>
<div class="sectionbody">
<div class="paragraph">
<p>When you instantiate the JMS Streamer, you will need to concretize​ the following generic types:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>T extends Message</code> =&gt; the type of JMS <code>Message</code> this streamer will receive. If it can receive multiple, use the generic <code>Message</code> type.</p>
</li>
<li>
<p><code>K</code> =&gt; the type of the cache key.</p>
</li>
<li>
<p><code>V</code> =&gt; the type of the cache value.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>To configure the JMS streamer, you will need to provide the following compulsory properties:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>connectionFactory</code> =&gt; an instance of your <code>ConnectionFactory</code> duly configured as required by the broker. It can be a pooled <code>ConnectionFactory</code>.</p>
</li>
<li>
<p><code>destination</code> or (<code>destinationName</code> and <code>destinationType</code>) =&gt; a <code>Destination</code> object (normally a broker-specific implementation of the JMS <code>Queue</code> or <code>Topic</code> interfaces), or the combination of a destination name (queue or topic name) and the type as a <code>Class</code> reference to either <code>Queue</code> or <code>Topic</code>. In the latter case, the streamer will use either <code>Session.createQueue(String)</code> or <code>Session.createTopic(String)</code> to get a hold of the destination.</p>
</li>
<li>
<p><code>transformer</code> =&gt; an implementation of <code>MessageTransformer&lt;T, K, V&gt;</code> that digests a JMS message of type <code>T</code> and produces a <code>Map&lt;K, V&gt;</code> of cache entries to add. It can also return <code>null</code> or an empty <code>Map</code> to ignore the incoming message.</p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect1">
<h2 id="example">Example</h2>
<div class="sectionbody">
<div class="paragraph">
<p>The example in this section populates a cache with <code>String</code> keys and <code>String</code> values, consuming <code>TextMessages</code> with this format:</p>
</div>
<div class="listingblock">
<div class="content">
<pre>raulk,Raul Kripalani
dsetrakyan,Dmitriy Setrakyan
sv,Sergi Vladykin
gm,Gianfranco Murador</pre>
</div>
</div>
<div class="paragraph">
<p>Here is the code:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// create a data streamer</span>
<span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">dataStreamer</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"mycache"</span><span class="o">));</span>
<span class="n">dataStreamer</span><span class="o">.</span><span class="na">allowOverwrite</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>

<span class="c1">// create a JMS streamer and plug the data streamer into it</span>
<span class="nc">JmsStreamer</span><span class="o">&lt;</span><span class="nc">TextMessage</span><span class="o">,</span> <span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">jmsStreamer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JmsStreamer</span><span class="o">&lt;&gt;();</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setIgnite</span><span class="o">(</span><span class="n">ignite</span><span class="o">);</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setStreamer</span><span class="o">(</span><span class="n">dataStreamer</span><span class="o">);</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setConnectionFactory</span><span class="o">(</span><span class="n">connectionFactory</span><span class="o">);</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setDestination</span><span class="o">(</span><span class="n">destination</span><span class="o">);</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setTransacted</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">setTransformer</span><span class="o">(</span><span class="k">new</span> <span class="nc">MessageTransformer</span><span class="o">&lt;</span><span class="nc">TextMessage</span><span class="o">,</span> <span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;()</span> <span class="o">{</span>
    <span class="nd">@Override</span>
    <span class="kd">public</span> <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">TextMessage</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
        <span class="kd">final</span> <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">answer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;&gt;();</span>
        <span class="nc">String</span> <span class="n">text</span><span class="o">;</span>
        <span class="k">try</span> <span class="o">{</span>
            <span class="n">text</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="na">getText</span><span class="o">();</span>
        <span class="o">}</span>
        <span class="k">catch</span> <span class="o">(</span><span class="nc">JMSException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
            <span class="no">LOG</span><span class="o">.</span><span class="na">warn</span><span class="o">(</span><span class="s">"Could not parse message."</span><span class="o">,</span> <span class="n">e</span><span class="o">);</span>
            <span class="k">return</span> <span class="nc">Collections</span><span class="o">.</span><span class="na">emptyMap</span><span class="o">();</span>
        <span class="o">}</span>
        <span class="k">for</span> <span class="o">(</span><span class="nc">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">text</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span> <span class="o">{</span>
            <span class="nc">String</span><span class="o">[]</span> <span class="n">tokens</span> <span class="o">=</span> <span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span>
            <span class="n">answer</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]);</span>
        <span class="o">}</span>
        <span class="k">return</span> <span class="n">answer</span><span class="o">;</span>
    <span class="o">}</span>
<span class="o">});</span>

<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>

<span class="c1">// on application shutdown</span>
<span class="n">jmsStreamer</span><span class="o">.</span><span class="na">stop</span><span class="o">();</span>
<span class="n">dataStreamer</span><span class="o">.</span><span class="na">close</span><span class="o">();</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.):</p>
</div>
<code-tabs><code-tab data-tab='pom.xml'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
    <span class="nt">&lt;groupId&gt;</span>org.apache.ignite<span class="nt">&lt;/groupId&gt;</span>
    <span class="nt">&lt;artifactId&gt;</span>ignite-jms11-ext<span class="nt">&lt;/artifactId&gt;</span>
    <span class="nt">&lt;version&gt;</span>${ignite-jms11-ext.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
        <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<div class="copyright">
 © 2022 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation. 

</div>

    </article>
    <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<nav class="right-nav" data-swiftype-index='false'>
    <ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#instantiating-jms-streamer">Instantiating JMS Streamer</a></li>
<li><a href="#example">Example</a></li>
</ul>
    <!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
      
      
<footer>
</footer>

</nav>
    
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script>

    <script>
    // inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late 
    anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
    anchors.options = {
        placement: 'right',
        visible: 'always'
    };
    </script>
</body>
<script type='module' src='/assets/js/index.js?1651672546' async></script>
</html>
