| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = Apache Kafka Streamer |
| |
| == Overview |
| |
| Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache. |
| Either of the following two methods can be used to achieve such streaming: |
| |
| * using Kafka Connect functionality with Ignite sink |
| * importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming |
| |
| == Streaming Data via Kafka Connect |
| |
| `IgniteSinkConnector` will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing |
| it to your specified cache. The connector can be found in the `optional/ignite-kafka` module. It and its dependencies |
| have to be on the classpath of a Kafka running instance, as described in the following subsection. _For more information |
| on Kafka Connect, see http://kafka.apache.org/documentation.html#connect[Kafka Documentation, window=_blank]._ |
| |
| === Setting up and Running |
| |
| . Add the `IGNITE_HOME/libs/optional/ignite-kafka` module to the application classpath. |
| |
| . Prepare worker configurations, e.g., |
| + |
| [tabs] |
| -- |
| tab:Configuration[] |
| [source,yaml] |
| ---- |
| bootstrap.servers=localhost:9092 |
| |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| key.converter.schemas.enable=false |
| value.converter.schemas.enable=false |
| |
| internal.key.converter=org.apache.kafka.connect.storage.StringConverter |
| internal.value.converter=org.apache.kafka.connect.storage.StringConverter |
| internal.key.converter.schemas.enable=false |
| internal.value.converter.schemas.enable=false |
| |
| offset.storage.file.filename=/tmp/connect.offsets |
| offset.flush.interval.ms=10000 |
| ---- |
| -- |
| |
| . Prepare connector configurations, e.g., |
| + |
| [tabs] |
| -- |
| tab:Configuration[] |
| [source,yaml] |
| ---- |
| # connector |
| name=my-ignite-connector |
| connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector |
| tasks.max=2 |
| topics=someTopic1,someTopic2 |
| |
| # cache |
| cacheName=myCache |
| cacheAllowOverwrite=true |
| igniteCfg=/some-path/ignite.xml |
| singleTupleExtractorCls=my.company.MyExtractor |
| ---- |
| -- |
| + |
| * where `cacheName` is the name of the cache you specify in `/some-path/ignite.xml` and the data from `someTopic1,someTopic2` |
| will be pulled and stored. |
| * `cacheAllowOverwrite` can be set to `true` if you want to enable overwriting existing values in the cache. |
| * If you need to parse the incoming data and decide on your new key and value, you can implement it as `StreamSingleTupleExtractor` and specify as `singleTupleExtractorCls`. |
| * You can also set `cachePerNodeDataSize` and `cachePerNodeParOps` to adjust per-node buffer and the maximum number of parallel stream operations for a single node. |
| |
| . Start connector, for instance, in a standalone mode as follows, |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties |
| ---- |
| -- |
| |
| === Checking the Flow |
| |
| To perform a very basic functionality check, you can do the following, |
| |
| . Start Zookeeper |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| bin/zookeeper-server-start.sh config/zookeeper.properties |
| ---- |
| -- |
| . Start Kafka server |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| bin/kafka-server-start.sh config/server.properties |
| ---- |
| -- |
| . Provide some data input to the Kafka server |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1 |
| ---- |
| -- |
| . Start the connector |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties |
| ---- |
| -- |
| . Check the value is in the cache. For example, via REST API, |
| + |
| [tabs] |
| -- |
| tab:Shell[] |
| [source,shell] |
| ---- |
| http://node1:8080/ignite?cmd=size&cacheName=cache1 |
| ---- |
| -- |
| |
| == Streaming data with Ignite Kafka Streamer Module |
| |
| If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module |
| dependency like this (replace `${ignite-kafka-ext.version}` with actual Ignite Kafka Extension version you are interested in): |
| |
| [tabs] |
| -- |
| tab:pom.xml[] |
| [source,xml] |
| ---- |
| <project xmlns="http://maven.apache.org/POM/4.0.0" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 |
| http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
| ... |
| <dependencies> |
| ... |
| <dependency> |
| <groupId>org.apache.ignite</groupId> |
| <artifactId>ignite-kafka-ext</artifactId> |
| <version>${ignite-kafka-ext.version}</version> |
| </dependency> |
| ... |
| </dependencies> |
| ... |
| </project> |
| ---- |
| -- |
| |
| Having a cache with `String` keys and `String` values, the streamer can be started as follows |
| [tabs] |
| -- |
| tab:Java[] |
| [source,java] |
| ---- |
| KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); |
| |
| IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")); |
| |
| // allow overwriting cache data |
| stmr.allowOverwrite(true); |
| |
| kafkaStreamer.setIgnite(ignite); |
| kafkaStreamer.setStreamer(stmr); |
| |
| // set the topic |
| kafkaStreamer.setTopic(someKafkaTopic); |
| |
| // set the number of threads to process Kafka streams |
| kafkaStreamer.setThreads(4); |
| |
| // set Kafka consumer configurations |
| kafkaStreamer.setConsumerConfig(kafkaConsumerConfig); |
| |
| // set extractor |
| kafkaStreamer.setSingleTupleExtractor(strExtractor); |
| |
| kafkaStreamer.start(); |
| |
| ... |
| |
| // stop on shutdown |
| kafkaStreamer.stop(); |
| |
| strm.close(); |
| ---- |
| -- |
| |
| For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html |