blob: 1bc1d694297868e785f9e6024aba1b7afbec3347 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'UA-61232409-1');
</script>
<meta charset="UTF-8">
<title>Apache Kafka Streamer | Ignite Documentation</title>
<link rel="canonical" href="/docs/kafka/kafka-streamer" />
<link rel="stylesheet" href="/assets/css/styles.css?1658382975">
<link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<link rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1658382975"></script>
<script type="text/javascript" src="/assets/js/anchor.min.js?1658382975"></script>
</head>
<body>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--header>
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg"/>
</button>
<nav>
</nav>
<form class='search'>
<button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
<input type="search" placeholder="Search…" id="search-input">
</form>
<button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
<button type='button' class='top-nav-toggle'>⋮</button>
<a href="https://github.com/ignite" title='GitHub' class='github' target="_blank">
<img src="/assets/images/github-gray.svg" alt="GitHub logo">
</a>
</header-->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/extensions/aws/aws" class=''>Amazon S3 IP Finder</a>
</li>
<li>
<a href="/docs/extensions/camel/camel-streamer" class=''>Apache Camel Streamer</a>
</li>
<li>
<a href="/docs/extensions/flink/flink-streamer" class=''>Apache Flink Streamer</a>
</li>
<li>
<a href="/docs/extensions/flume/flume-sink" class=''>Apache Flume Sink</a>
</li>
<li>
<a href="/docs/extensions/azure/azure" class=''>Apache Ignite Azure Module</a>
</li>
<li>
<a href="/docs/extensions/gce/gce" class=''>Apache Ignite GCE Module</a>
</li>
<li>
<a href="/docs/extensions/pub-sub/pub-sub" class=''>Apache Ignite Pub/Sub Module</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-boot" class=''>Apache Ignite and Spring Boot</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-data" class=''>Apache Ignite and Spring Data</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-caching" class=''>Apache Ignite and Spring Cache</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-sessions" class=''>Apache Ignite and Spring Session</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-tx" class=''>Apache Ignite and Spring Transactions</a>
</li>
<li>
<a href="/docs/extensions/kafka/kafka-streamer" class=''>Apache Kafka Streamer</a>
</li>
<li>
<a href="/docs/extensions/storm/storm-streamer" class=''>Apache Storm Streamer</a>
</li>
<li>
<a href="/docs/extensions/cdc/change-data-capture-extensions" class=''>Change Data Capture Extension</a>
</li>
<li>
<a href="/docs/extensions/jms/jms-streamer" class=''>JMS Streamer</a>
</li>
<li>
<a href="/docs/extensions/mqtt/mqtt-streamer" class=''>MQTT Streamer</a>
</li>
<li>
<a href="/docs/extensions/perf-statistics/performance-statistics" class=''>Performance Statistics Extension</a>
</li>
<li>
<a href="/docs/extensions/rocketmq/rocketmq-streamer" class=''>RocketMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/topology-validator/topology-validator" class=''>Topology Validator</a>
</li>
<li>
<a href="/docs/extensions/twitter/twitter-streamer" class=''>Twitter Streamer</a>
</li>
<li>
<a href="/docs/extensions/zeromq/zeromq-streamer" class=''>ZeroMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/zookeeper/zookeeper-ip" class=''>ZooKeeper IP Finder</a>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="/_docs/kafka/kafka-streamer.adoc" target="_blank">Edit</a>
<h1>Apache Kafka Streamer</h1>
<div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p>Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
Either of the following two methods can be used to achieve such streaming:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>using Kafka Connect functionality with Ignite sink</p>
</li>
<li>
<p>importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming</p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect1">
<h2 id="streaming-data-via-kafka-connect">Streaming Data via Kafka Connect</h2>
<div class="sectionbody">
<div class="paragraph">
<p><code>IgniteSinkConnector</code> will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing
it to your specified cache. The connector can be found in the <code>ignite-kafka</code> module. It and its dependencies
have to be on the classpath of a Kafka running instance, as described in the following subsection. <em>For more information
on Kafka Connect, see <a href="http://kafka.apache.org/documentation.html#connect" target="_blank" rel="noopener">Kafka Documentation</a>.</em></p>
</div>
<div class="sect2">
<h3 id="setting-up-and-running">Setting up and Running</h3>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>Add the <code>IGNITE_HOME/libs/ignite-kafka</code> module to the application classpath.</p>
</li>
<li>
<p>Prepare worker configurations, e.g.,</p>
<code-tabs><code-tab data-tab='Configuration'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="yaml"><span class="s">bootstrap.servers=localhost:9092</span>
<span class="s">key.converter=org.apache.kafka.connect.storage.StringConverter</span>
<span class="s">value.converter=org.apache.kafka.connect.storage.StringConverter</span>
<span class="s">key.converter.schemas.enable=false</span>
<span class="s">value.converter.schemas.enable=false</span>
<span class="s">internal.key.converter=org.apache.kafka.connect.storage.StringConverter</span>
<span class="s">internal.value.converter=org.apache.kafka.connect.storage.StringConverter</span>
<span class="s">internal.key.converter.schemas.enable=false</span>
<span class="s">internal.value.converter.schemas.enable=false</span>
<span class="s">offset.storage.file.filename=/tmp/connect.offsets</span>
<span class="s">offset.flush.interval.ms=10000</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Prepare connector configurations, e.g.,</p>
<code-tabs><code-tab data-tab='Configuration'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="yaml"><span class="c1"># connector</span>
<span class="s">name=my-ignite-connector</span>
<span class="s">connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector</span>
<span class="s">tasks.max=2</span>
<span class="s">topics=someTopic1,someTopic2</span>
<span class="c1"># cache</span>
<span class="s">cacheName=myCache</span>
<span class="s">cacheAllowOverwrite=true</span>
<span class="s">igniteCfg=/some-path/ignite.xml</span>
<span class="s">singleTupleExtractorCls=my.company.MyExtractor</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="ulist">
<ul>
<li>
<p>where <code>cacheName</code> is the name of the cache you specify in <code>/some-path/ignite.xml</code> and the data from <code>someTopic1,someTopic2</code>
will be pulled and stored.</p>
</li>
<li>
<p><code>cacheAllowOverwrite</code> can be set to <code>true</code> if you want to enable overwriting existing values in the cache.</p>
</li>
<li>
<p>If you need to parse the incoming data and decide on your new key and value, you can implement it as <code>StreamSingleTupleExtractor</code> and specify as <code>singleTupleExtractorCls</code>.</p>
</li>
<li>
<p>You can also set <code>cachePerNodeDataSize</code> and <code>cachePerNodeParOps</code> to adjust per-node buffer and the maximum number of parallel stream operations for a single node.</p>
</li>
</ul>
</div>
</li>
<li>
<p>Start connector, for instance, in a standalone mode as follows,</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
</ol>
</div>
</div>
<div class="sect2">
<h3 id="checking-the-flow">Checking the Flow</h3>
<div class="paragraph">
<p>To perform a very basic functionality check, you can do the following,</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>Start Zookeeper</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Start Kafka server</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">bin/kafka-server-start.sh config/server.properties</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Provide some data input to the Kafka server</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">bin/kafka-console-producer.sh <span class="nt">--broker-list</span> localhost:9092 <span class="nt">--topic</span> <span class="nb">test</span> <span class="nt">--property</span> parse.key<span class="o">=</span><span class="nb">true</span> <span class="nt">--property</span> key.separator<span class="o">=</span>,k1,v1</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Start the connector</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Check the value is in the cache. For example, via REST API,</p>
<code-tabs><code-tab data-tab='Shell'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="shell">http://node1:8080/ignite?cmd<span class="o">=</span>size&amp;cacheName<span class="o">=</span>cache1</code></pre>
</div>
</div></code-tab></code-tabs>
</li>
</ol>
</div>
</div>
</div>
</div>
<div class="sect1">
<h2 id="streaming-data-with-ignite-kafka-streamer-module">Streaming data with Ignite Kafka Streamer Module</h2>
<div class="sectionbody">
<div class="paragraph">
<p>If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module
dependency like this (replace <code>${ignite-kafka-ext.version}</code> with actual Ignite Kafka Extension version you are interested in):</p>
</div>
<code-tabs><code-tab data-tab='pom.xml'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="xml"><span class="nt">&lt;project</span> <span class="na">xmlns=</span><span class="s">"http://maven.apache.org/POM/4.0.0"</span>
<span class="na">xmlns:xsi=</span><span class="s">"http://www.w3.org/2001/XMLSchema-instance"</span>
<span class="na">xsi:schemaLocation=</span><span class="s">"http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"</span><span class="nt">&gt;</span>
...
<span class="nt">&lt;dependencies&gt;</span>
...
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.ignite<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>ignite-kafka-ext<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>${ignite-kafka-ext.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
...
<span class="nt">&lt;/dependencies&gt;</span>
...
<span class="nt">&lt;/project&gt;</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>Having a cache with <code>String</code> keys and <code>String</code> values, the streamer can be started as follows</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">KafkaStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">kafkaStreamer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KafkaStreamer</span><span class="o">&lt;&gt;();</span>
<span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">stmr</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"myCache"</span><span class="o">));</span>
<span class="c1">// allow overwriting cache data</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">allowOverwrite</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setIgnite</span><span class="o">(</span><span class="n">ignite</span><span class="o">);</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setStreamer</span><span class="o">(</span><span class="n">stmr</span><span class="o">);</span>
<span class="c1">// set the topic</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setTopic</span><span class="o">(</span><span class="n">someKafkaTopic</span><span class="o">);</span>
<span class="c1">// set the number of threads to process Kafka streams</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setThreads</span><span class="o">(</span><span class="mi">4</span><span class="o">);</span>
<span class="c1">// set Kafka consumer configurations</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setConsumerConfig</span><span class="o">(</span><span class="n">kafkaConsumerConfig</span><span class="o">);</span>
<span class="c1">// set extractor</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">setSingleTupleExtractor</span><span class="o">(</span><span class="n">strExtractor</span><span class="o">);</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="o">...</span>
<span class="c1">// stop on shutdown</span>
<span class="n">kafkaStreamer</span><span class="o">.</span><span class="na">stop</span><span class="o">();</span>
<span class="n">strm</span><span class="o">.</span><span class="na">close</span><span class="o">();</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>For the detailed information on Kafka consumer properties, refer to <a href="http://kafka.apache.org/documentation.html" class="bare">http://kafka.apache.org/documentation.html</a></p>
</div>
</div>
</div>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div class="copyright">
© 2022 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class="right-nav" data-swiftype-index='false'>
<ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#streaming-data-via-kafka-connect">Streaming Data via Kafka Connect</a>
<ul class="sectlevel2">
<li><a href="#setting-up-and-running">Setting up and Running</a></li>
<li><a href="#checking-the-flow">Checking the Flow</a></li>
</ul>
</li>
<li><a href="#streaming-data-with-ignite-kafka-streamer-module">Streaming data with Ignite Kafka Streamer Module</a></li>
</ul>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<footer>
</footer>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
</body>
<script type='module' src='/assets/js/index.js?1658382975' async></script>
</html>