blob: 218529120fc9b461f0ac39b3d2f4551098b803ef [file] [log] [blame]
Apache Ignite Kafka Streamer Module
-----------------------------------
Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
There are two ways this can be achieved:
- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
- using Kafka Connect functionality.
Below are the details.
## Importing Ignite Kafka Streamer Module In Maven Project
If you are using Maven to manage dependencies of your project, you can add Kafka module
dependency like this (replace '${ignite.version}' with actual Ignite version you are
interested in):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-kafka-ext</artifactId>
<version>${ignite-kafka-ext.version}</version>
</dependency>
...
</dependencies>
...
</project>
## Streaming Data to Ignite via Kafka Connect
Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
as described in the following subsection.
### Setting up and Running
1. Put the following jar files on Kafka's classpath
- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
- ignite-core-x.x.x-SNAPSHOT.jar
- ignite-spring-x.x.x-SNAPSHOT.jar
- cache-api-1.0.0.jar
- spring-aop-4.1.0.RELEASE.jar
- spring-beans-4.1.0.RELEASE.jar
- spring-context-4.1.0.RELEASE.jar
- spring-core-4.1.0.RELEASE.jar
- spring-expression-4.1.0.RELEASE.jar
- commons-logging-1.1.1.jar
2. Prepare worker configurations, e.g.,
```
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
```
3. Prepare connector configurations, e.g.,
```
# connector
name=string-ignite-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
tasks.max=2
topics=testTopic1,testTopic2
# converter (optional)
singleTupleExtractorCls=my.company.MyTupleExtractor
# cache
cacheName=cache1
cacheAllowOverwrite=true
igniteCfg=/some-path/ignite.xml
```
where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
of parallel stream operations for a single node.
If you need to create an Ignite key from a Kafka value, implement StreamSingleTupleExtractor and specify it as 'singleTupleExtractorCls'.
See example-ignite.xml in tests for a simple cache configuration file example.
4. Start connector, for instance, as follows,
```
./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
```
## Checking the Flow
To perform a very basic functionality check, you can do the following,
1. Start Zookeeper
```
bin/zookeeper-server-start.sh config/zookeeper.properties
```
2. Start Kafka server
```
bin/kafka-server-start.sh config/server.properties
```
3. Provide some data input to the Kafka server
```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
k1,v1
```
4. Start the connector. For example,
```
./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
```
5. Check the value is in the cache. For example, via REST,
```
http://node1:8080/ignite?cmd=size&cacheName=cache1
```
## Streaming Cache Event Data to Kafka via Kafka Connect
Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.
Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
as described in the following subsection.
### Setting up and Running
1. Put the following jar files on Kafka's classpath
- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
- ignite-core-x.x.x-SNAPSHOT.jar
- cache-api-1.0.0.jar
- ignite-spring-x.x.x-SNAPSHOT.jar
- spring-aop-4.1.0.RELEASE.jar
- spring-beans-4.1.0.RELEASE.jar
- spring-context-4.1.0.RELEASE.jar
- spring-core-4.1.0.RELEASE.jar
- spring-expression-4.1.0.RELEASE.jar
- commons-logging-1.1.1.jar
2. Prepare worker configurations, e.g.,
```
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
```
Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
3. Prepare connector configurations, e.g.,
```
# connector
name=ignite-src-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
tasks.max=2
# cache
topicNames=testTopic1,testTopic2
cacheEvts=put,removed
## if you decide to filter remotely (recommended)
cacheFilterCls=MyFilter
cacheName=cache1
igniteCfg=/some-path/ignite.xml
```
where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
used to safely transfer data from Ignite cache to Kafka.
The following cache events can be specified in the connector configurations:
- CREATED
- DESTROYED
- PUT
- READ
- REMOVED
- LOCKED
- UNLOCKED
- SWAPPED
- UNSWAPPED
- EXPIRED
For a simple cache configuration file example, see example-ignite.xml in tests.
4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).