blob: f8016b35c741222845b6bb999735da3fd6949ce6 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'UA-61232409-1');
</script>
<meta charset="UTF-8">
<title>Apache Camel Streamer | Ignite Documentation</title>
<link rel="canonical" href="/docs/camel/camel-streamer" />
<link rel="stylesheet" href="/assets/css/styles.css?1658382975">
<link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<link rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1658382975"></script>
<script type="text/javascript" src="/assets/js/anchor.min.js?1658382975"></script>
</head>
<body>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--header>
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg"/>
</button>
<nav>
</nav>
<form class='search'>
<button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
<input type="search" placeholder="Search…" id="search-input">
</form>
<button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
<button type='button' class='top-nav-toggle'>⋮</button>
<a href="https://github.com/ignite" title='GitHub' class='github' target="_blank">
<img src="/assets/images/github-gray.svg" alt="GitHub logo">
</a>
</header-->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/extensions/aws/aws" class=''>Amazon S3 IP Finder</a>
</li>
<li>
<a href="/docs/extensions/camel/camel-streamer" class=''>Apache Camel Streamer</a>
</li>
<li>
<a href="/docs/extensions/flink/flink-streamer" class=''>Apache Flink Streamer</a>
</li>
<li>
<a href="/docs/extensions/flume/flume-sink" class=''>Apache Flume Sink</a>
</li>
<li>
<a href="/docs/extensions/azure/azure" class=''>Apache Ignite Azure Module</a>
</li>
<li>
<a href="/docs/extensions/gce/gce" class=''>Apache Ignite GCE Module</a>
</li>
<li>
<a href="/docs/extensions/pub-sub/pub-sub" class=''>Apache Ignite Pub/Sub Module</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-boot" class=''>Apache Ignite and Spring Boot</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-data" class=''>Apache Ignite and Spring Data</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-caching" class=''>Apache Ignite and Spring Cache</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-sessions" class=''>Apache Ignite and Spring Session</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-tx" class=''>Apache Ignite and Spring Transactions</a>
</li>
<li>
<a href="/docs/extensions/kafka/kafka-streamer" class=''>Apache Kafka Streamer</a>
</li>
<li>
<a href="/docs/extensions/storm/storm-streamer" class=''>Apache Storm Streamer</a>
</li>
<li>
<a href="/docs/extensions/cdc/change-data-capture-extensions" class=''>Change Data Capture Extension</a>
</li>
<li>
<a href="/docs/extensions/jms/jms-streamer" class=''>JMS Streamer</a>
</li>
<li>
<a href="/docs/extensions/mqtt/mqtt-streamer" class=''>MQTT Streamer</a>
</li>
<li>
<a href="/docs/extensions/perf-statistics/performance-statistics" class=''>Performance Statistics Extension</a>
</li>
<li>
<a href="/docs/extensions/rocketmq/rocketmq-streamer" class=''>RocketMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/topology-validator/topology-validator" class=''>Topology Validator</a>
</li>
<li>
<a href="/docs/extensions/twitter/twitter-streamer" class=''>Twitter Streamer</a>
</li>
<li>
<a href="/docs/extensions/zeromq/zeromq-streamer" class=''>ZeroMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/zookeeper/zookeeper-ip" class=''>ZooKeeper IP Finder</a>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="/_docs/camel/camel-streamer.adoc" target="_blank">Edit</a>
<h1>Apache Camel Streamer</h1>
<div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p>This documentation page focuses on the Apache Camel, which can also be thought of as a universal streamer because it
allows you to consume from any technology or protocol supported by Camel into an Ignite Cache.</p>
</div>
<div class="imageblock">
<div class="content">
<img src="/docs/images/camel-streamer.png" alt="Camel Streamer">
</div>
</div>
<div class="paragraph">
<p>With this streamer, you can ingest entries straight into an Ignite cache based on:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Calls received on a Web Service (SOAP or REST), by extracting the body or headers.</p>
</li>
<li>
<p>Listening on a TCP or UDP channel for messages.</p>
</li>
<li>
<p>The content of files received via FTP or written to the local filesystem.</p>
</li>
<li>
<p>Email messages received via POP3 or IMAP.</p>
</li>
<li>
<p>A MongoDB tailable cursor.</p>
</li>
<li>
<p>An AWS SQS queue.</p>
</li>
<li>
<p>And many others.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>This streamer supports two modes of ingestion: <strong>direct ingestion</strong> and <strong>mediated ingestion</strong>.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
<h3 id="the-ignite-camel-component" class="discrete">The Ignite Camel Component</h3>
<div class="paragraph">
<p>There is also the <a href="https://camel.apache.org/components/latest/ignite-summary.html" target="_blank" rel="noopener">camel-ignite</a> component, if what you are looking is
to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route.</p>
</div>
</td>
</tr>
</table>
</div>
</div>
</div>
<div class="sect1">
<h2 id="maven-dependency">Maven Dependency</h2>
<div class="sectionbody">
<div class="paragraph">
<p>To make use of the <code>ignite-camel-ext</code> streamer, you need to add the following dependency:</p>
</div>
<code-tabs><code-tab data-tab='pom.xml'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.ignite<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>ignite-camel-ext<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>${ignite-camel-ext.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>It will also pull in <code>camel-core</code> as a transitive dependency.</p>
</div>
</div>
</div>
<div class="sect1">
<h2 id="direct-ingestion">Direct Ingestion</h2>
<div class="sectionbody">
<div class="paragraph">
<p>Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a
Tuple Extractor. We call this <strong>direct ingestion</strong>.</p>
</div>
<div class="paragraph">
<p>Here is a code sample:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// Start Apache Ignite.</span>
<span class="nc">Ignite</span> <span class="n">ignite</span> <span class="o">=</span> <span class="nc">Ignition</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="c1">// Create an streamer pipe which ingests into the 'mycache' cache.</span>
<span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">pipe</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"mycache"</span><span class="o">);</span>
<span class="c1">// Create a Camel streamer and connect it.</span>
<span class="nc">CamelStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">streamer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CamelStreamer</span><span class="o">&lt;&gt;();</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setIgnite</span><span class="o">(</span><span class="n">ignite</span><span class="o">);</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setStreamer</span><span class="o">(</span><span class="n">pipe</span><span class="o">);</span>
<span class="c1">// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setEndpointUri</span><span class="o">(</span><span class="s">"jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST"</span><span class="o">);</span>
<span class="c1">// This is the tuple extractor. We'll assume each message contains only one tuple.</span>
<span class="c1">// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.</span>
<span class="c1">// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry&lt;?,?&gt; with the key and value.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setSingleTupleExtractor</span><span class="o">(</span><span class="k">new</span> <span class="nc">StreamSingleTupleExtractor</span><span class="o">&lt;</span><span class="nc">Exchange</span><span class="o">,</span> <span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="nf">extract</span><span class="o">(</span><span class="nc">Exchange</span> <span class="n">exchange</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">String</span> <span class="n">stationId</span> <span class="o">=</span> <span class="n">exchange</span><span class="o">.</span><span class="na">getIn</span><span class="o">().</span><span class="na">getHeader</span><span class="o">(</span><span class="s">"X-StationId"</span><span class="o">,</span> <span class="nc">String</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="nc">String</span> <span class="n">temperature</span> <span class="o">=</span> <span class="n">exchange</span><span class="o">.</span><span class="na">getIn</span><span class="o">().</span><span class="na">getBody</span><span class="o">(</span><span class="nc">String</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="k">return</span> <span class="k">new</span> <span class="nc">GridMapEntry</span><span class="o">&lt;&gt;(</span><span class="n">stationId</span><span class="o">,</span> <span class="n">temperature</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span>
<span class="c1">// Start the streamer.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
<div class="sect1">
<h2 id="mediated-ingestion">Mediated Ingestion</h2>
<div class="sectionbody">
<div class="paragraph">
<p>For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and <strong>ingest only the result into the Ignite cache</strong>.</p>
</div>
<div class="paragraph">
<p>We call this <strong>mediated ingestion</strong>.</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// Create a CamelContext with a custom route that will:</span>
<span class="c1">// (1) consume from our Jetty endpoint.</span>
<span class="c1">// (2) transform incoming JSON into a Java object with Jackson.</span>
<span class="c1">// (3) uses JSR 303 Bean Validation to validate the object.</span>
<span class="c1">// (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.</span>
<span class="nc">CamelContext</span> <span class="n">context</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">DefaultCamelContext</span><span class="o">();</span>
<span class="n">context</span><span class="o">.</span><span class="na">addRoutes</span><span class="o">(</span><span class="k">new</span> <span class="nc">RouteBuilder</span><span class="o">()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">()</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
<span class="n">from</span><span class="o">(</span><span class="s">"jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST"</span><span class="o">)</span>
<span class="o">.</span><span class="na">unmarshal</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="nc">JsonLibrary</span><span class="o">.</span><span class="na">Jackson</span><span class="o">)</span>
<span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"bean-validator:validate"</span><span class="o">)</span>
<span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"direct:ignite.ingest"</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span>
<span class="c1">// Remember our Streamer is now consuming from the Direct endpoint above.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setEndpointUri</span><span class="o">(</span><span class="s">"direct:ignite.ingest"</span><span class="o">);</span></code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
<div class="sect1">
<h2 id="setting-a-response">Setting a Response</h2>
<div class="sectionbody">
<div class="paragraph">
<p>By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request.
If you want to customize​ the response, set a Camel <code>Processor</code> as a <code>responseProcessor</code>:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="n">streamer</span><span class="o">.</span><span class="na">setResponseProcessor</span><span class="o">(</span><span class="k">new</span> <span class="nc">Processor</span><span class="o">()</span> <span class="o">{</span>
<span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="nc">Exchange</span> <span class="n">exchange</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
<span class="n">exchange</span><span class="o">.</span><span class="na">getOut</span><span class="o">().</span><span class="na">setHeader</span><span class="o">(</span><span class="nc">Exchange</span><span class="o">.</span><span class="na">HTTP_RESPONSE_CODE</span><span class="o">,</span> <span class="mi">200</span><span class="o">);</span>
<span class="n">exchange</span><span class="o">.</span><span class="na">getOut</span><span class="o">().</span><span class="na">setBody</span><span class="o">(</span><span class="s">"OK"</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span></code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div class="copyright">
© 2022 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class="right-nav" data-swiftype-index='false'>
<ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#maven-dependency">Maven Dependency</a></li>
<li><a href="#direct-ingestion">Direct Ingestion</a></li>
<li><a href="#mediated-ingestion">Mediated Ingestion</a></li>
<li><a href="#setting-a-response">Setting a Response</a></li>
</ul>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<footer>
</footer>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
</body>
<script type='module' src='/assets/js/index.js?1658382975' async></script>
</html>