IGNITE-13394 Migrate Kafka module to ignite-extensions - Fixes #23.
Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/kafka-ext/README.txt b/modules/kafka-ext/README.txt
new file mode 100644
index 0000000..348d143
--- /dev/null
+++ b/modules/kafka-ext/README.txt
@@ -0,0 +1,210 @@
+Apache Ignite Kafka Streamer Module
+-----------------------------------
+
+Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
+
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
+
+Below are the details.
+
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
+dependency like this (replace '${ignite.version}' with actual Ignite version you are
+interested in):
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ ...
+ <dependencies>
+ ...
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-kafka-ext</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+ ...
+ </dependencies>
+ ...
+</project>
+
+
+## Streaming Data to Ignite via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# converter (optional)
+singleTupleExtractorCls=my.company.MyTupleExtractor
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+If you need to create an Ignite key from a Kafka value, implement StreamSingleTupleExtractor and specify it as 'singleTupleExtractorCls'.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```
+
+## Streaming Cache Event Data to Kafka via Kafka Connect
+
+Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
+using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=ignite-src-connector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
+tasks.max=2
+
+# cache
+topicNames=testTopic1,testTopic2
+cacheEvts=put,removed
+## if you decide to filter remotely (recommended)
+cacheFilterCls=MyFilter
+cacheName=cache1
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
+used to safely transfer data from Ignite cache to Kafka.
+
+The following cache events can be specified in the connector configurations:
+- CREATED
+- DESTROYED
+- PUT
+- READ
+- REMOVED
+- LOCKED
+- UNLOCKED
+- SWAPPED
+- UNSWAPPED
+- EXPIRED
+
+For a simple cache configuration file example, see example-ignite.xml in tests.
+
+4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
diff --git a/modules/kafka-ext/licenses/apache-2.0.txt b/modules/kafka-ext/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/kafka-ext/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml b/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100755
index 0000000..b78fa9c
--- /dev/null
+++ b/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<!--
+ Log4j configuration.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+ <!--
+ Logs System.out messages to console.
+ -->
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <!-- Log to STDOUT. -->
+ <param name="Target" value="System.out"/>
+
+ <!-- Log from DEBUG and higher. -->
+ <param name="Threshold" value="DEBUG"/>
+
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+
+ <!-- Do not log beyond INFO level. -->
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="levelMin" value="DEBUG"/>
+ <param name="levelMax" value="INFO"/>
+ </filter>
+ </appender>
+
+ <!--
+ Logs all System.err messages to console.
+ -->
+ <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+ <!-- Log to STDERR. -->
+ <param name="Target" value="System.err"/>
+
+ <!-- Log from WARN and higher. -->
+ <param name="Threshold" value="WARN"/>
+
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <!--
+ Logs all output to specified file.
+ -->
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="${IGNITE_HOME}/work/log/ignite.log"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <!-- Disable all open source debugging. -->
+ <category name="org">
+ <level value="INFO"/>
+ </category>
+
+ <category name="org.eclipse.jetty">
+ <level value="INFO"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print at info by default. -->
+ <level value="INFO"/>
+
+ <!-- Append to file and console. -->
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="CONSOLE_ERR"/>
+ </root>
+</log4j:configuration>
diff --git a/modules/kafka-ext/modules/core/src/test/config/tests.properties b/modules/kafka-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..18b3606
--- /dev/null
+++ b/modules/kafka-ext/modules/core/src/test/config/tests.properties
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Local address to bind to.
+local.ip=127.0.0.1
+
+# TCP communication port
+comm.tcp.port=30010
+
+# JBoss JNDI
+# JBoss context factory for JNDI connection establishing.
+jboss.jndi.context.factory=org.jnp.interfaces.NamingContextFactory
+# JBoss specific parameter for JNDI connection establishing.
+jboss.jndi.pkg.prefixes=org.jboss.naming:org.jnp.interfaces
+# URL of JBoss server for the 1st node.
+jboss.jndi.node1.provider.url=jnp://localhost:1199
+# URL of JBoss server for the 2nd node.
+jboss.jndi.node2.provider.url=jnp://localhost:1299
+# JBoss Discovery test max wait time.
+jboss.disco.test.wait=180000
+
+# Deployment configuration paths.
+# You will either need to override deploy.uri.dir or supply CLASSES_URI as system property.
+#
+# Path to keystore with private and public keys.
+deploy.uri.secure.keystore=@{IGNITE_HOME}/modules/tests/config/securedeploy/keystore
+# Temporary dir where deployment unit stored before deploy.
+deploy.uri.tmpdir=${java.io.tmpdir}/gg
+# Deployment dir for file scanner test with different types of GAR's.
+deploy.uri.file2.path=${java.io.tmpdir}/gg/verification/
+# URI string.
+deploy.uri.file2=file://freq=200@localhost/${java.io.tmpdir}/gg/verification/
+# File scanner URI for local file deployment.
+deploy.uri.file=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/file/
+# FTP scanner URI for FTP deployment.
+deploy.uri.ftp=ftp://ftptest:iddqd@94.72.60.102:21/test/deployment
+# Classes scanner URI for classes deployment. Must be overridden for every user.
+deploy.uri.cls=${CLASSES_URI}
+# Http scanner URI for HTTP deployment.
+deploy.uri.http=http://fake.uri
+# Http scanner URI for secure SSL HTTPs deployment.
+deploy.uri.https=https://fake.uri
+# Directory with descriptors to construct GAR files.
+deploy.gar.descriptor.dir=modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/META-INF
+
+# Directory with a number of descriptors for the Ant gar task.
+ant.gar.descriptor.dir=modules/extdata/p2p/META-INF
+# Temporary directory for the Ant task resulting GAR file.
+ant.gar.tmpdir=${java.io.tmpdir}/gg
+# The same as p2p.uri.cls but without protocol
+ant.gar.srcdir=@{IGNITE_HOME}/modules/extdata/uri/target/classes/
+
+# Paths to use in URI deployment SPI tests
+urideployment.jar.uri=modules/extdata/uri/target/deploy/uri.jar
+urideployment.path.tmp=modules/extdata/uri/target/deploy_tmp/
+
+# GAR paths to use in URI deployment SPI tests
+ant.urideployment.gar.uri=file://freq=5000@localhost/EXTDATA/uri/target/deploy
+ant.urideployment.gar.file=modules/extdata/uri/target/deploy/uri.gar
+ant.urideployment.gar.libs-file=modules/extdata/uri/target/deploy2/uri-libs.gar
+ant.urideployment.gar.classes-file=modules/extdata/uri/target/deploy2/uri-classes.gar
+ant.urideployment.gar.path=modules/extdata/uri/target/deploy/
+
+# Classpath directory for GridP2PUserVersionChangeSelfTest
+ant.userversion.class.dir=@{IGNITE_HOME}/modules/tests/java/
+
+# Multicast discovery self test.
+discovery.mbeanserver.selftest.baseport=50000
+
+# TCP communication self test.
+comm.mbeanserver.selftest.baseport=50100
+
+# Kernel tests.
+grid.comm.selftest.sender.timeout=1000
+grid.comm.selftest.timeout=10000
+
+#P2P tests
+#Overwrite this property. It should point to P2P module compilation directory.
+p2p.uri.cls=file://localhost/@{IGNITE_HOME}/modules/extdata/p2p/target/classes/
+p2p.uri.cls.second=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/classes/
+
+# AOP tests.
+# Connector port for RMI.
+connector.rmi.port=7657
+# Connector port for XFire Web Service.
+connector.ws.port=9090
+
+# Load test duration in minutes.
+load.test.duration=500
+load.test.threadnum=50
+load.test.nodenum=5
+
+# Loaders tests
+loader.self.test.config=modules/core/src/test/config/loaders/grid-cfg.xml
+loader.self.multipletest.config=modules/core/src/test/config/loaders/grid-cfg-2-grids.xml
+loader.self.test.jboss.config=modules/core/src/test/config/loaders/grid-cfg.xml
+
+# WebSphere jmx properties
+websphere.jmx.connector.host=localhost
+websphere.jmx.connector.port=8880
+websphere.jmx.connector.security=false
+websphere.jmx.username=
+websphere.jmx.pwd=
+
+# GlassFish jmx properties for GlassFish Loader
+glassfish.jmx.rmi.connector.port=8686
+glassfish.jmx.username=admin
+glassfish.jmx.password=adminadmin
+
+# Tomcat jmx properties for Servlet Loader
+tomcat.jmx.rmi.connector.port=1097
+
+# Marshaller for tests
+#marshaller.class=org.apache.ignite.marshaller.jdk.GridJdkMarshaller
+
+# EC2 configuration for tests
+#amazon.access.key=
+#amazon.secret.key=
+
+# SSH config.
+ssh.username=uname
+ssh.password=passwd
+
+# SSL tests keystore.
+ssl.keystore.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/server.jks
+ssl.keystore.password=123456
+
+# node01 signed with trust-one, node02 and node03 by trust-two, node02old is expired
+# trust-both contains both CAs
+ssl.keystore.node01.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node01.jks
+ssl.keystore.node02.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02.jks
+ssl.keystore.node03.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node03.jks
+ssl.keystore.trustone.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-one.jks
+ssl.keystore.trusttwo.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-two.jks
+ssl.keystore.trustboth.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-both.jks
+ssl.keystore.node02old.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02old.jks
+
+# Hadoop home directory.
+hadoop.home=@{HADOOP_HOME}
diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml
new file mode 100644
index 0000000..115d767
--- /dev/null
+++ b/modules/kafka-ext/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-extensions-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-kafka-ext</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ <!-- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/143 -->
+ <exclusions>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>${easymock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-tools</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Generate the OSGi MANIFEST.MF for this bundle. -->
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
new file mode 100644
index 0000000..02dbedd
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into
+ * {@link IgniteDataStreamer} instance.
+ * <p>
+ * Uses Kafka's High Level Consumer API to read messages from Kafka.
+ */
+public class KafkaStreamer<K, V> extends StreamAdapter<ConsumerRecord, K, V> {
+ /** Default polling timeout. */
+ private static final long DFLT_TIMEOUT = 100;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Polling tasks executor. */
+ private ExecutorService executor;
+
+ /** Topics. */
+ private List<String> topics;
+
+ /** Number of threads. */
+ private int threads;
+
+ /** Kafka consumer config. */
+ private Properties consumerCfg;
+
+ /** Polling timeout. */
+ private long timeout = DFLT_TIMEOUT;
+
+ /** Kafka consumer tasks. */
+ private final List<ConsumerTask> consumerTasks = new ArrayList<>();
+
+ /**
+ * Sets the topic names.
+ *
+ * @param topics Topic names.
+ */
+ public void setTopic(List<String> topics) {
+ this.topics = topics;
+ }
+
+ /**
+ * Sets the threads.
+ *
+ * @param threads Number of threads.
+ */
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets the consumer config.
+ *
+ * @param consumerCfg Consumer configuration.
+ */
+ public void setConsumerConfig(Properties consumerCfg) {
+ this.consumerCfg = consumerCfg;
+ }
+
+ /**
+ * Sets the polling timeout for Kafka tasks.
+ *
+ * @param timeout Timeout.
+ */
+ public void setTimeout(long timeout) {
+ A.ensure(timeout > 0, "timeout > 0");
+
+ this.timeout = timeout;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.notNull(topics, "topics");
+ A.notNull(consumerCfg, "kafka consumer config");
+ A.ensure(threads > 0, "threads > 0");
+ A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(),
+ "Extractor must be configured");
+
+ log = getIgnite().log();
+
+ executor = Executors.newFixedThreadPool(threads);
+
+ IntStream.range(0, threads).forEach(i -> consumerTasks.add(new ConsumerTask(consumerCfg)));
+
+ for (ConsumerTask task : consumerTasks)
+ executor.submit(task);
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ for (ConsumerTask task : consumerTasks)
+ task.stop();
+
+ if (executor != null) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+ if (log.isDebugEnabled())
+ log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
+ }
+ catch (InterruptedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Interrupted during shutdown, exiting uncleanly.");
+ }
+ }
+ }
+
+ /** Polling task. */
+ class ConsumerTask implements Callable<Void> {
+ /** Kafka consumer. */
+ private final KafkaConsumer<?, ?> consumer;
+
+ /** Stopped. */
+ private volatile boolean stopped;
+
+ /** Constructor. */
+ public ConsumerTask(Properties consumerCfg) {
+ this.consumer = new KafkaConsumer<>(consumerCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ consumer.subscribe(topics);
+
+ try {
+ while (!stopped) {
+ for (ConsumerRecord record : consumer.poll(timeout)) {
+ try {
+ addMessage(record);
+ }
+ catch (Exception e) {
+ U.error(log, "Record is ignored due to an error [record = " + record + ']', e);
+ }
+ }
+ }
+ }
+ catch (WakeupException we) {
+ if (log.isInfoEnabled())
+ log.info("Consumer is being stopped.");
+ }
+ catch (KafkaException ke) {
+ log.error("Kafka error", ke);
+ }
+ finally {
+ consumer.close();
+ }
+
+ return null;
+ }
+
+ /** Stops the polling task. */
+ public void stop() {
+ stopped = true;
+
+ if (consumer != null)
+ consumer.wakeup();
+ }
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..3fbfd9c
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+ /** Sink properties. */
+ private Map<String, String> configProps;
+
+ /** Expected configurations. */
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ /**
+ * A sink lifecycle method. Validates grid-specific sink properties.
+ *
+ * @param props Sink properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ configProps = props;
+
+ try {
+ A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+ }
+ catch (IllegalArgumentException e) {
+ throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+ }
+ }
+
+ /**
+ * Obtains a sink task class to be instantiated for feeding data into grid.
+ *
+ * @return IgniteSinkTask class.
+ */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSinkTask.class;
+ }
+
+ /**
+ * Builds each config for <tt>maxTasks</tt> tasks.
+ *
+ * @param maxTasks Max number of tasks.
+ * @return Task configs.
+ */
+ @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ Map<String, String> taskProps = new HashMap<>();
+
+ taskProps.putAll(configProps);
+
+ for (int i = 0; i < maxTasks; i++)
+ taskConfigs.add(taskProps);
+
+ return taskConfigs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..3fb511a
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+ /** Ignite configuration file path. */
+ public static final String CACHE_CFG_PATH = "igniteCfg";
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cacheName";
+
+ /** Flag to enable overwriting existing values in cache. */
+ public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+ /** Size of per-node buffer before data is sent to remote node. */
+ public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+ /** Maximum number of parallel stream operations per node. */
+ public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+
+ /** Class to transform the entry before feeding into cache. */
+ public static final String SINGLE_TUPLE_EXTRACTOR_CLASS = "singleTupleExtractorCls";
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..184f309
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+ /** Flag for stopped state. */
+ private static volatile boolean stopped = true;
+
+ /** Ignite grid configuration file. */
+ private static String igniteConfigFile;
+
+ /** Cache name. */
+ private static String cacheName;
+
+ /** Entry transformer. */
+ private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return new IgniteSinkConnector().version();
+ }
+
+ /**
+ * Initializes grid client from configPath.
+ *
+ * @param props Task properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ // Each task has the same parameters -- avoid setting more than once.
+ if (cacheName != null)
+ return;
+
+ cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+ igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+ StreamerContext.getStreamer().allowOverwrite(
+ Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+ StreamerContext.getStreamer().perNodeBufferSize(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+ StreamerContext.getStreamer().perNodeParallelOperations(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+ if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
+ String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+ if (transformerCls != null && !transformerCls.isEmpty()) {
+ try {
+ Class<? extends StreamSingleTupleExtractor> clazz =
+ (Class<? extends StreamSingleTupleExtractor<SinkRecord, Object, Object>>)
+ Class.forName(transformerCls);
+
+ extractor = clazz.newInstance();
+ }
+ catch (Exception e) {
+ throw new ConnectException("Failed to instantiate the provided transformer!", e);
+ }
+ }
+ }
+
+ stopped = false;
+ }
+
+ /**
+ * Buffers records.
+ *
+ * @param records Records to inject into grid.
+ */
+ @SuppressWarnings("unchecked")
+ @Override public void put(Collection<SinkRecord> records) {
+ try {
+ for (SinkRecord record : records) {
+ // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+ if (extractor != null) {
+ Map.Entry<Object, Object> entry = extractor.extract(record);
+ StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
+ }
+ else {
+ if (record.key() != null) {
+ StreamerContext.getStreamer().addData(record.key(), record.value());
+ }
+ else {
+ log.error("Failed to stream a record with null key!");
+ }
+ }
+ }
+ }
+ catch (ConnectException e) {
+ log.error("Failed adding record", e);
+
+ throw new ConnectException(e);
+ }
+ }
+
+ /**
+ * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+ *
+ * @param offsets Offset information.
+ */
+ @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (stopped)
+ return;
+
+ StreamerContext.getStreamer().flush();
+ }
+
+ /**
+ * Stops the grid client.
+ */
+ @Override public void stop() {
+ if (stopped)
+ return;
+
+ stopped = true;
+
+ StreamerContext.getIgnite().close();
+ }
+
+ /**
+ * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+ *
+ * @param stopped Stopped flag.
+ */
+ protected static void setStopped(boolean stopped) {
+ IgniteSinkTask.stopped = stopped;
+
+ extractor = null;
+ }
+
+ /**
+ * Streamer context initializing grid and data streamer instances on demand.
+ */
+ public static class StreamerContext {
+ /** Constructor. */
+ private StreamerContext() {
+ }
+
+ /** Instance holder. */
+ private static class Holder {
+ private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+
+ private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+ }
+
+ /**
+ * Obtains grid instance.
+ *
+ * @return Grid instance.
+ */
+ public static Ignite getIgnite() {
+ return Holder.IGNITE;
+ }
+
+ /**
+ * Obtains data streamer instance.
+ *
+ * @return Data streamer instance.
+ */
+ public static IgniteDataStreamer getStreamer() {
+ return Holder.STREAMER;
+ }
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
new file mode 100644
index 0000000..986888e
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+/**
+ * Source connector to manage source tasks that listens to registered Ignite grid events and forward them to Kafka.
+ *
+ * Note that only cache events are enabled for streaming.
+ */
+public class IgniteSourceConnector extends SourceConnector {
+ /** Source properties. */
+ private Map<String, String> configProps;
+
+ /** Expected configurations. */
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(Map<String, String> props) {
+ try {
+ A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
+ A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
+ A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
+ A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
+ }
+ catch (IllegalArgumentException e) {
+ throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
+ }
+
+ configProps = props;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSourceTask.class;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ Map<String, String> taskProps = new HashMap<>();
+
+ taskProps.putAll(configProps);
+
+ for (int i = 0; i < maxTasks; i++)
+ taskConfigs.add(taskProps);
+
+ return taskConfigs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
new file mode 100644
index 0000000..7d590e5
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSourceConstants {
+ /** Ignite configuration file path. */
+ public static final String CACHE_CFG_PATH = "igniteCfg";
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cacheName";
+
+ /** Events to be listened to. Names corresponds to {@link IgniteSourceTask.CacheEvt}. */
+ public static final String CACHE_EVENTS = "cacheEvts";
+
+ /** Internal buffer size. */
+ public static final String INTL_BUF_SIZE = "evtBufferSize";
+
+ /** Size of one chunk drained from the internal buffer. */
+ public static final String INTL_BATCH_SIZE = "evtBatchSize";
+
+ /** User-defined filter class. */
+ public static final String CACHE_FILTER_CLASS = "cacheFilterCls";
+
+ /** Kafka topic. */
+ public static final String TOPIC_NAMES = "topicNames";
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
new file mode 100644
index 0000000..6e3d43b
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume remote cluster cache events from the grid and inject them into Kafka.
+ * <p>
+ * Note that a task will create a bounded queue in the grid for more reliable data transfer.
+ * Queue size can be changed by {@link IgniteSourceConstants#INTL_BUF_SIZE}.
+ */
+public class IgniteSourceTask extends SourceTask {
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
+
+ /** Tasks static monitor. */
+ private static final Object lock = new Object();
+
+ /** Event buffer size. */
+ private static int evtBufSize = 100000;
+
+ /** Event buffer. */
+ private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(evtBufSize);
+
+ /** Max number of events taken from the buffer at once. */
+ private static int evtBatchSize = 100;
+
+ /** Flag for stopped state. */
+ private static volatile boolean stopped = true;
+
+ /** Ignite grid configuration file. */
+ private static String igniteCfgFile;
+
+ /** Cache name. */
+ private static String cacheName;
+
+ /** Remote Listener id. */
+ private static UUID rmtLsnrId;
+
+ /** Local listener. */
+ private static TaskLocalListener locLsnr = new TaskLocalListener();
+
+ /** User-defined filter. */
+ private static IgnitePredicate<CacheEvent> filter;
+
+ /** Topic. */
+ private static String topics[];
+
+ /** Offset. */
+ private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
+
+ /** Partition. */
+ private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return new IgniteSinkConnector().version();
+ }
+
+ /**
+ * Filtering is done remotely. Local listener buffers data for injection into Kafka.
+ *
+ * @param props Task properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ synchronized (lock) {
+ // Each task has the same parameters -- avoid setting more than once.
+ // Nothing to do if the task has been already started.
+ if (!stopped)
+ return;
+
+ cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
+ igniteCfgFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
+ topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
+
+ if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
+ evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
+
+ if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
+ evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
+
+ if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
+ String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
+ if (filterCls != null && !filterCls.isEmpty()) {
+ try {
+ Class<? extends IgnitePredicate<CacheEvent>> clazz =
+ (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
+
+ filter = clazz.newInstance();
+ }
+ catch (Exception e) {
+ log.error("Failed to instantiate the provided filter! " +
+ "User-enabled filtering is ignored!", e);
+ }
+ }
+ }
+
+ TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName);
+
+ try {
+ int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
+
+ rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+ .remoteListen(locLsnr, rmtLsnr, evts);
+ }
+ catch (Exception e) {
+ log.error("Failed to register event listener!", e);
+
+ throw new ConnectException(e);
+ }
+ finally {
+ stopped = false;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<SourceRecord> poll() throws InterruptedException {
+ ArrayList<SourceRecord> records = new ArrayList<>(evtBatchSize);
+ ArrayList<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+ if (stopped)
+ return records;
+
+ try {
+ if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+ for (CacheEvent evt : evts) {
+ // schema and keys are ignored.
+ for (String topic : topics)
+ records.add(new SourceRecord(srcPartition, offset, topic, null, evt));
+ }
+
+ return records;
+ }
+ }
+ catch (IgniteException e) {
+ log.error("Error when polling event queue!", e);
+ }
+
+ // for shutdown.
+ return null;
+ }
+
+ /**
+ * Converts comma-delimited cache events strings to Ignite internal representation.
+ *
+ * @param evtPropsStr Comma-delimited cache event names.
+ * @return Ignite internal representation of cache events to be registered with the remote listener.
+ * @throws Exception If error.
+ */
+ private int[] cacheEvents(String evtPropsStr) throws Exception {
+ String[] evtStr = evtPropsStr.split("\\s*,\\s*");
+
+ if (evtStr.length == 0)
+ return EventType.EVTS_CACHE;
+
+ int[] evts = new int[evtStr.length];
+
+ try {
+ for (int i = 0; i < evtStr.length; i++)
+ evts[i] = CacheEvt.valueOf(evtStr[i].toUpperCase()).getId();
+ }
+ catch (Exception e) {
+ log.error("Failed to recognize the provided cache event!", e);
+
+ throw new Exception(e);
+ }
+ return evts;
+ }
+
+ /**
+ * Stops the grid client.
+ */
+ @Override public synchronized void stop() {
+ if (stopped)
+ return;
+
+ stopped = true;
+
+ stopRemoteListen();
+
+ IgniteGrid.getIgnite().close();
+ }
+
+ /**
+ * Stops the remote listener.
+ */
+ protected void stopRemoteListen() {
+ if (rmtLsnrId != null)
+ IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+ .stopRemoteListen(rmtLsnrId);
+
+ rmtLsnrId = null;
+ }
+
+ /**
+ * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+ *
+ * @param stopped Stopped flag.
+ */
+ protected static void setStopped(boolean stopped) {
+ IgniteSourceTask.stopped = stopped;
+ }
+
+ /**
+ * Local listener buffering cache events to be further sent to Kafka.
+ */
+ private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID id, CacheEvent evt) {
+ try {
+ if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
+ log.error("Failed to buffer event {}", evt.name());
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Remote filter.
+ */
+ private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+ /** */
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /**
+ * @param cacheName Cache name.
+ */
+ TaskRemoteFilter(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheEvent evt) {
+ Affinity<Object> affinity = ignite.affinity(cacheName);
+
+ if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) {
+ // Process this event. Ignored on backups.
+ if (filter != null && filter.apply(evt))
+ return false;
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Grid instance initialized on demand.
+ */
+ private static class IgniteGrid {
+ /** Constructor. */
+ private IgniteGrid() {
+ // No-op.
+ }
+
+ /** Instance holder. */
+ private static class Holder {
+ /** */
+ private static final Ignite IGNITE = Ignition.start(igniteCfgFile);
+ }
+
+ /**
+ * Obtains grid instance.
+ *
+ * @return Grid instance.
+ */
+ private static Ignite getIgnite() {
+ return Holder.IGNITE;
+ }
+ }
+
+ /** Cache events available for listening. */
+ private enum CacheEvt {
+ /** */
+ CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
+ /** */
+ DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
+ /** */
+ PUT(EventType.EVT_CACHE_OBJECT_PUT),
+ /** */
+ READ(EventType.EVT_CACHE_OBJECT_READ),
+ /** */
+ REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
+ /** */
+ LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
+ /** */
+ UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
+ /** */
+ EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+ /** Internal Ignite event id. */
+ private final int id;
+
+ /**
+ * Constructor.
+ *
+ * @param id Internal Ignite event id.
+ */
+ CacheEvt(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets Ignite event id.
+ *
+ * @return Ignite event id.
+ */
+ int getId() {
+ return id;
+ }
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java
new file mode 100644
index 0000000..f565c04
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer Connector.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
new file mode 100644
index 0000000..6a60bad
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * {@link CacheEvent} converter for Connect API.
+ */
+public class CacheEventConverter implements Converter {
+ private final CacheEventDeserializer deserializer = new CacheEventDeserializer();
+
+ private final CacheEventSerializer serializer = new CacheEventSerializer();
+
+ /** {@inheritDoc} */
+ @Override public void configure(Map<String, ?> map, boolean b) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] fromConnectData(String topic, Schema schema, Object o) {
+ try {
+ return serializer.serialize(topic, (CacheEvent)o);
+ }
+ catch (SerializationException e) {
+ throw new DataException("Failed to convert to byte[] due to a serialization error", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
+ CacheEvent evt;
+
+ try {
+ evt = deserializer.deserialize(topic, bytes);
+ }
+ catch (SerializationException e) {
+ throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
+ }
+
+ if (evt == null) {
+ return SchemaAndValue.NULL;
+ }
+ return new SchemaAndValue(null, evt);
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
new file mode 100644
index 0000000..3bcf5fb
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Deserializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventDeserializer implements Deserializer<CacheEvent> {
+ /** Marshaller. */
+ private static final Marshaller marsh = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public void configure(Map<String, ?> map, boolean b) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEvent deserialize(String topic, byte[] bytes) {
+ try {
+ return U.unmarshal(marsh, bytes, getClass().getClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ throw new SerializationException("Failed to deserialize cache event!", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
new file mode 100644
index 0000000..bc09256
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Serializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventSerializer implements Serializer<CacheEvent> {
+ /** Marshaller. */
+ private static final Marshaller marsh = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public void configure(Map<String, ?> map, boolean b) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] serialize(String topic, CacheEvent event) {
+ try {
+ return U.marshal(marsh, event);
+ }
+ catch (IgniteCheckedException e) {
+ throw new SerializationException("Failed to serialize cache event!", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java
new file mode 100644
index 0000000..726ab39
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer Serializer.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java
new file mode 100644
index 0000000..d668d30
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer.
+ */
+
+package org.apache.ignite.stream.kafka;
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
new file mode 100644
index 0000000..c88fba8
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
+import org.apache.ignite.stream.kafka.connect.IgniteSourceConnectorTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Apache Kafka streamers tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ // Kafka streamer.
+ KafkaIgniteStreamerSelfTest.class,
+
+ // Kafka streamers via Connect API.
+ IgniteSinkConnectorTest.class,
+ IgniteSourceConnectorTest.class
+})
+public class IgniteKafkaStreamerSelfTestSuite {
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
new file mode 100644
index 0000000..3f20d87
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests {@link KafkaStreamer}.
+ */
+public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
+ /** Embedded Kafka. */
+ private TestKafkaBroker embeddedBroker;
+
+ /** Count. */
+ private static final int CNT = 100;
+
+ /** Test topic. */
+ private static final String TOPIC_NAME = "page_visits";
+
+ /** Kafka partition. */
+ private static final int PARTITIONS = 4;
+
+ /** Kafka replication factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Topic message key prefix. */
+ private static final String KEY_PREFIX = "192.168.2.";
+
+ /** Topic message value URL. */
+ private static final String VALUE_URL = ",www.example.com,";
+
+ /** Constructor. */
+ public KafkaIgniteStreamerSelfTest() {
+ super(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ embeddedBroker = new TestKafkaBroker();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid().cache(DEFAULT_CACHE_NAME).clear();
+
+ embeddedBroker.shutdown();
+ }
+
+ /**
+ * Tests Kafka streamer.
+ *
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
+ */
+ @Test
+ public void testKafkaStreamer() throws TimeoutException, InterruptedException {
+ embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
+
+ Map<String, String> keyValMap = produceStream(TOPIC_NAME);
+
+ consumerStream(TOPIC_NAME, keyValMap);
+ }
+
+ /**
+ * Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(String topic) {
+ // Generate random subnets.
+ List<Integer> subnet = new ArrayList<>();
+
+ for (int i = 1; i <= CNT; i++)
+ subnet.add(i);
+
+ Collections.shuffle(subnet);
+
+ List<ProducerRecord<String, String>> messages = new ArrayList<>(CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String ip = KEY_PREFIX + subnet.get(evt);
+
+ String msg = runtime + VALUE_URL + ip;
+
+ messages.add(new ProducerRecord<>(topic, ip, msg));
+
+ keyValMap.put(ip, msg);
+ }
+
+ embeddedBroker.sendMessages(messages);
+
+ return keyValMap;
+ }
+
+ /**
+ * Consumes Kafka stream via Ignite.
+ *
+ * @param topic Topic name.
+ * @param keyValMap Expected key value map.
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
+ */
+ private void consumerStream(String topic, Map<String, String> keyValMap)
+ throws TimeoutException, InterruptedException {
+ KafkaStreamer<String, String> kafkaStmr = null;
+
+ Ignite ignite = grid();
+
+ try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ // Configure Kafka streamer.
+ kafkaStmr = new KafkaStreamer<>();
+
+ // Get the cache.
+ IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ // Set Ignite instance.
+ kafkaStmr.setIgnite(ignite);
+
+ // Set data streamer instance.
+ kafkaStmr.setStreamer(stmr);
+
+ // Set the topic.
+ kafkaStmr.setTopic(Arrays.asList(topic));
+
+ // Set the number of threads.
+ kafkaStmr.setThreads(4);
+
+ // Set the consumer configuration.
+ kafkaStmr.setConsumerConfig(
+ createDefaultConsumerConfig(embeddedBroker.getBrokerAddress(), "groupX"));
+
+ kafkaStmr.setMultipleTupleExtractor(
+ record -> {
+ Map<String, String> entries = new HashMap<>();
+
+ try {
+ String key = (String)record.key();
+ String val = (String)record.value();
+
+ // Convert the message into number of cache entries with same key or dynamic key from actual message.
+ // For now using key as cache entry key and value as cache entry value - for test purpose.
+ entries.put(key, val);
+ }
+ catch (Exception ex) {
+ fail("Unexpected error." + ex);
+ }
+
+ return entries;
+ });
+
+ // Start kafka streamer.
+ kafkaStmr.start();
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @IgniteInstanceResource
+ private Ignite ig;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+
+ if (log.isInfoEnabled()) {
+ IgniteEx igEx = (IgniteEx)ig;
+
+ UUID nodeId = igEx.localNode().id();
+
+ log.info("Recive event=" + evt + ", nodeId=" + nodeId);
+ }
+
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
+ latch.await(10, TimeUnit.SECONDS));
+
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+ }
+ finally {
+ if (kafkaStmr != null)
+ kafkaStmr.stop();
+ }
+ }
+
+ /**
+ * Creates default consumer config.
+ *
+ * @param servers Bootstrap servers' address in the form of <server:port;server:port>.
+ * @param grpId Group Id for kafka subscriber.
+ * @return Kafka consumer configuration.
+ */
+ private Properties createDefaultConsumerConfig(String servers, String grpId) {
+ A.notNull(servers, "bootstrap servers");
+ A.notNull(grpId, "groupId");
+
+ Properties props = new Properties();
+
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, grpId);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ return props;
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..24dbfc8
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import kafka.zk.KafkaZkClient;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+ /** ZooKeeper connection timeout. */
+ private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+ /** ZooKeeper session timeout. */
+ private static final int ZK_SESSION_TIMEOUT = 6000;
+
+ /** ZooKeeper port. */
+ private static final int ZK_PORT = 21811;
+
+ /** Broker host. */
+ private static final String BROKER_HOST = "localhost";
+
+ /** Broker port. */
+ private static final int BROKER_PORT = 11092;
+
+ /** Kafka config. */
+ private KafkaConfig kafkaCfg;
+
+ /** Kafka server. */
+ private KafkaServer kafkaSrv;
+
+ /** ZooKeeper. */
+ private TestingServer zkServer;
+
+ /** Kafka Zookeeper utils. */
+ private ZkUtils zkUtils;
+
+ /**
+ * Kafka broker constructor.
+ */
+ public TestKafkaBroker() {
+ try {
+ setupZooKeeper();
+
+ setupKafkaServer();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to start Kafka: " + e);
+ }
+ }
+
+ /**
+ * Creates a topic.
+ *
+ * @param topic Topic name.
+ * @param partitions Number of partitions for the topic.
+ * @param replicationFactor Replication factor.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
+ */
+ public void createTopic(String topic, int partitions, int replicationFactor)
+ throws TimeoutException, InterruptedException {
+ List<KafkaServer> servers = new ArrayList<>();
+
+ servers.add(kafkaSrv);
+
+ KafkaZkClient client = kafkaSrv.zkClient();
+
+ TestUtils.createTopic(client, topic, partitions, replicationFactor,
+ scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+ }
+
+ /**
+ * Sends a message to Kafka broker.
+ *
+ * @param records List of records.
+ * @return Producer used to send the message.
+ */
+ public void sendMessages(List<ProducerRecord<String, String>> records) {
+ Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
+
+ for (ProducerRecord<String, String> rec : records)
+ producer.send(rec);
+
+ producer.flush();
+ producer.close();
+ }
+
+ /**
+ * Shuts down test Kafka broker.
+ */
+ public void shutdown() {
+ if (zkUtils != null)
+ zkUtils.close();
+
+ if (kafkaSrv != null)
+ kafkaSrv.shutdown();
+
+ if (zkServer != null) {
+ try {
+ zkServer.stop();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+
+ List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+ for (String logDir : logDirs)
+ U.delete(new File(logDir));
+ }
+
+ /**
+ * Sets up test Kafka broker.
+ *
+ * @throws IOException If failed.
+ */
+ private void setupKafkaServer() throws IOException {
+ kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+ kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
+
+ kafkaSrv.startup();
+ }
+
+ /**
+ * Sets up ZooKeeper test server.
+ *
+ * @throws Exception If failed.
+ */
+ private void setupZooKeeper() throws Exception {
+ zkServer = new TestingServer(ZK_PORT, true);
+
+ Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+ ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+ zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+ }
+
+ /**
+ * Obtains Kafka config.
+ *
+ * @return Kafka config.
+ * @throws IOException If failed.
+ */
+ private Properties getKafkaConfig() throws IOException {
+ Properties props = new Properties();
+
+ props.put("broker.id", "0");
+ props.put("zookeeper.connect", zkServer.getConnectString());
+ props.put("host.name", BROKER_HOST);
+ props.put("port", BROKER_PORT);
+ props.put("offsets.topic.replication.factor", "1");
+ props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+ props.put("log.flush.interval.messages", "1");
+ props.put("log.flush.interval.ms", "10");
+
+ return props;
+ }
+
+ /**
+ * Obtains broker address.
+ *
+ * @return Kafka broker address.
+ */
+ public String getBrokerAddress() {
+ return BROKER_HOST + ":" + BROKER_PORT;
+ }
+
+ /**
+ * Obtains producer config.
+ *
+ * @return Kafka Producer config.
+ */
+ private Properties getProducerConfig() {
+ Properties props = new Properties();
+
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ return props;
+ }
+
+ /**
+ * Creates temporary directory.
+ *
+ * @param prefix Prefix.
+ * @return Created file.
+ * @throws IOException If failed.
+ */
+ private static File createTmpDir(String prefix) throws IOException {
+ Path path = Files.createTempDirectory(prefix);
+
+ return path.toFile();
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
new file mode 100644
index 0000000..4c912b9
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Sink connector mock for tests for using the task mock.
+ */
+public class IgniteSinkConnectorMock extends IgniteSinkConnector {
+ /** {@inheritDoc} */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSinkTaskMock.class;
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..8a70636
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.lang.reflect.Field;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+ /** Number of input messages. */
+ private static final int EVENT_CNT = 10000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "testCache";
+
+ /** Test topics. */
+ private static final String[] TOPICS = {"sink-test1", "sink-test2"};
+
+ /** Kafka partition. */
+ private static final int PARTITIONS = 3;
+
+ /** Kafka replication factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Worker id. */
+ private static final String WORKER_ID = "workerId";
+
+ /** Test Kafka broker. */
+ private TestKafkaBroker kafkaBroker;
+
+ /** Worker to run tasks. */
+ private Worker worker;
+
+ /** Workers' herder. */
+ private Herder herder;
+
+ /** Ignite server node. */
+ private static Ignite grid;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ kafkaBroker = new TestKafkaBroker();
+
+ for (String topic : TOPICS)
+ kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+ Map<String, String> props = makeWorkerProps();
+ WorkerConfig workerCfg = new StandaloneConfig(props);
+
+ OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
+ offBackingStore.configure(workerCfg);
+
+ worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
+ worker.start();
+
+ herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
+ herder.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ herder.stop();
+
+ worker.stop();
+
+ kafkaBroker.shutdown();
+
+ grid.cache(CACHE_NAME).removeAll();
+
+ // reset cache name to overwrite task configurations.
+ Field field = IgniteSinkTask.class.getDeclaredField("cacheName");
+
+ field.setAccessible(true);
+ field.set(IgniteSinkTask.class, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ grid = startGrid("igniteServerNode", cfg);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testSinkPutsWithoutTransformation() throws Exception {
+ Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+ sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+
+ testSinkPuts(sinkProps, false);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testSinkPutsWithTransformation() throws Exception {
+ testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+ }
+
+ /**
+ * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+ * specified Kafka topics, because a sink task can read from multiple topics.
+ *
+ * @param sinkProps Sink properties.
+ * @param keyless Tests on Kafka stream with null keys if true.
+ * @throws Exception Thrown in case of the failure.
+ */
+ private void testSinkPuts(Map<String, String> sinkProps, boolean keyless) throws Exception {
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+ @Override public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+ if (error != null)
+ throw new RuntimeException("Failed to create a job!");
+ }
+ });
+
+ herder.putConnectorConfig(
+ sinkProps.get(ConnectorConfig.NAME_CONFIG),
+ sinkProps, false, cb);
+
+ cb.get();
+
+ final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+ final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+ // Produces events for the specified number of topics
+ for (String topic : TOPICS)
+ keyValMap.putAll(produceStream(topic, keyless));
+
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+ // Checks that each event was processed properly.
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+ assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+ }
+
+ /**
+ * Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @param keyless Indicates whether a Kafka key is specified.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(String topic, boolean keyless) {
+ List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < EVENT_CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String key = null;
+ if (!keyless)
+ key = topic + ":" + String.valueOf(evt);
+
+ String msg = topic + ":" + String.valueOf(evt) + "_" + runtime;
+
+ messages.add(new ProducerRecord<>(topic, key, msg));
+
+ if (!keyless)
+ keyValMap.put(key, msg);
+ else
+ keyValMap.put(topic + ":" + String.valueOf(evt), String.valueOf(runtime));
+ }
+
+ kafkaBroker.sendMessages(messages);
+
+ return keyValMap;
+ }
+
+ /**
+ * Creates properties for test sink connector.
+ *
+ * @param topics Topics.
+ * @return Test sink connector properties.
+ */
+ private Map<String, String> makeSinkProps(String topics) {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(SinkConnector.TOPICS_CONFIG, topics);
+ props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+ props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnectorMock.class.getName());
+ props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+ props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+ props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+ props.put(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS,
+ "org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest$TestExtractor");
+
+ return props;
+ }
+
+ /**
+ * Creates properties for Kafka Connect workers.
+ *
+ * @return Worker configurations.
+ */
+ private Map<String, String> makeWorkerProps() {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+ return props;
+ }
+
+ /**
+ * Test transformer.
+ */
+ static class TestExtractor implements StreamSingleTupleExtractor<SinkRecord, String, String> {
+ /** {@inheritDoc} */
+ @Override public Map.Entry<String, String> extract(SinkRecord msg) {
+ String[] parts = ((String)msg.value()).split("_");
+ return new AbstractMap.SimpleEntry<String, String>(parts[0], parts[1]);
+ }
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
new file mode 100644
index 0000000..58a59b5
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSinkTaskMock extends IgniteSinkTask {
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // Don't stop the grid for tests.
+ setStopped(true);
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
new file mode 100644
index 0000000..0157a17
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Source connector mock for tests for using the task mock.
+ */
+public class IgniteSourceConnectorMock extends IgniteSourceConnector {
+ /** {@inheritDoc} */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSourceTaskMock.class;
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
new file mode 100644
index 0000000..a1109e7
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests for {@link IgniteSourceConnector}.
+ */
+public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
+ /** Number of input messages. */
+ private static final int EVENT_CNT = 100;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "testCache";
+
+ /** Test topics created by connector. */
+ private static final String[] TOPICS = {"src-test1", "src-test2"};
+
+ /** Worker id. */
+ private static final String WORKER_ID = "workerId";
+
+ /** Test Kafka broker. */
+ private TestKafkaBroker kafkaBroker;
+
+ /** Worker to run tasks. */
+ private Worker worker;
+
+ /** Workers' herder. */
+ private Herder herder;
+
+ /** Ignite server node shared among tests. */
+ private static Ignite grid;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ grid = startGrid("igniteServerNode", cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ kafkaBroker = new TestKafkaBroker();
+
+ Map<String, String> props = makeWorkerProps();
+ WorkerConfig workerCfg = new StandaloneConfig(props);
+
+ MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+ offBackingStore.configure(workerCfg);
+
+ worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
+ worker.start();
+
+ herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
+ herder.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ herder.stop();
+
+ worker.stop();
+
+ kafkaBroker.shutdown();
+
+ grid.cache(CACHE_NAME).clear();
+
+ // reset cache name to overwrite task configurations.
+ Field field = IgniteSourceTask.class.getDeclaredField("cacheName");
+
+ field.setAccessible(true);
+ field.set(IgniteSourceTask.class, null);
+ }
+
+ /**
+ * Tests data flow from injecting data into grid and transferring it to Kafka cluster
+ * without user-specified filter.
+ *
+ * @throws Exception Thrown in case of the failure.
+ */
+ @Test
+ public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
+ Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, ","));
+
+ srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
+
+ doTest(srcProps, false);
+ }
+
+ /**
+ * Tests data flow from injecting data into grid and transferring it to Kafka cluster.
+ *
+ * @throws Exception Thrown in case of the failure.
+ */
+ @Test
+ public void testEventsInjectedIntoKafka() throws Exception {
+ doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+ }
+
+ /**
+ * Tests the source with the specified source configurations.
+ *
+ * @param srcProps Source properties.
+ * @param conditioned Flag indicating whether filtering is enabled.
+ * @throws Exception Fails if error.
+ */
+ private void doTest(Map<String, String> srcProps, boolean conditioned) throws Exception {
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+ @Override public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+ if (error != null)
+ throw new RuntimeException("Failed to create a job!", error);
+ }
+ });
+
+ herder.putConnectorConfig(
+ srcProps.get(ConnectorConfig.NAME_CONFIG),
+ srcProps, true, cb);
+
+ cb.get();
+
+ // Ugh! To be sure Kafka Connect's worker thread is properly started...
+ Thread.sleep(5000);
+
+ final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+ final IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
+ @Override public boolean apply(CacheEvent evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ Map<String, String> keyValMap = new HashMap<>(EVENT_CNT);
+
+ keyValMap.putAll(sendData());
+
+ // Checks all events are processed.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(locLsnr);
+
+ assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+
+ // Checks the events are transferred to Kafka broker.
+ if (conditioned)
+ checkDataDelivered(EVENT_CNT * TOPICS.length / 2);
+ else
+ checkDataDelivered(EVENT_CNT * TOPICS.length);
+
+ }
+
+ /**
+ * Sends messages to the grid.
+ *
+ * @return Map of key value messages.
+ * @throws IOException If failed.
+ */
+ private Map<String, String> sendData() throws IOException {
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < EVENT_CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String key = "test_" + String.valueOf(evt);
+ String msg = runtime + String.valueOf(evt);
+
+ if (evt >= EVENT_CNT / 2)
+ key = "conditioned_" + key;
+
+ grid.cache(CACHE_NAME).put(key, msg);
+
+ keyValMap.put(key, msg);
+ }
+
+ return keyValMap;
+ }
+
+ /**
+ * Checks if events were delivered to Kafka server.
+ *
+ * @param expectedEventsCnt Expected events count.
+ * @throws Exception If failed.
+ */
+ private void checkDataDelivered(final int expectedEventsCnt) throws Exception {
+ Properties props = new Properties();
+
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer");
+
+ final KafkaConsumer<String, CacheEvent> consumer = new KafkaConsumer<>(props);
+
+ consumer.subscribe(Arrays.asList(TOPICS));
+
+ final AtomicInteger evtCnt = new AtomicInteger();
+
+ try {
+ // Wait for expected events count.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+ for (ConsumerRecord<String, CacheEvent> record : records) {
+ info("Record: " + record);
+
+ evtCnt.getAndIncrement();
+ }
+ return evtCnt.get() >= expectedEventsCnt;
+ }
+ }, 20_000);
+
+ info("Waiting for unexpected records for 5 secs.");
+
+ assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+ for (ConsumerRecord<String, CacheEvent> record : records) {
+ error("Unexpected record: " + record);
+
+ evtCnt.getAndIncrement();
+ }
+ return evtCnt.get() > expectedEventsCnt;
+ }
+ }, 5_000));
+ }
+ catch (WakeupException ignored) {
+ // ignore for shutdown.
+ }
+ finally {
+ consumer.close();
+
+ assertEquals(expectedEventsCnt, evtCnt.get());
+ }
+ }
+
+ /**
+ * Creates properties for test source connector.
+ *
+ * @param topics Topics.
+ * @return Test source connector properties.
+ */
+ private Map<String, String> makeSourceProps(String topics) {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+ props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
+ props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
+ props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
+ props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
+ props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
+ props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
+ props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
+
+ return props;
+ }
+
+ /**
+ * Creates properties for Kafka Connect workers.
+ *
+ * @return Worker configurations.
+ * @throws IOException If failed.
+ */
+ private Map<String, String> makeWorkerProps() throws IOException {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+ return props;
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
new file mode 100644
index 0000000..4535ad5
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Source task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSourceTaskMock extends IgniteSourceTask {
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // Don't stop the grid for tests.
+ stopRemoteListen();
+
+ setStopped(true);
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
new file mode 100644
index 0000000..bf28a2b
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Test user-defined filter.
+ */
+class TestCacheEventFilter implements IgnitePredicate<CacheEvent> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheEvent evt) {
+ return ((String)evt.key()).startsWith("conditioned_");
+ }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java
new file mode 100644
index 0000000..d668d30
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer.
+ */
+
+package org.apache.ignite.stream.kafka;
diff --git a/modules/kafka-ext/src/test/resources/example-ignite.xml b/modules/kafka-ext/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..f23a306
--- /dev/null
+++ b/modules/kafka-ext/src/test/resources/example-ignite.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+ Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <!-- Enable peer class loading for remote events. -->
+ <property name="peerClassLoadingEnabled" value="true"/>
+ <!-- Enable client mode. -->
+ <property name="clientMode" value="true"/>
+
+ <!-- Cache accessed from IgniteSink. -->
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="name" value="testCache"/>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Enable cache events. -->
+ <property name="includeEventTypes">
+ <list>
+ <!-- Cache events. -->
+ <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/pom.xml b/pom.xml
index 2aa8af9..55d3af5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
<module>modules/storm-ext</module>
<module>modules/camel-ext</module>
<module>modules/jms11-ext</module>
+ <module>modules/kafka-ext</module>
</modules>
<profiles>