IGNITE-13394 Migrate Kafka module to ignite-extensions - Fixes #23.

Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/kafka-ext/README.txt b/modules/kafka-ext/README.txt
new file mode 100644
index 0000000..348d143
--- /dev/null
+++ b/modules/kafka-ext/README.txt
@@ -0,0 +1,210 @@
+Apache Ignite Kafka Streamer Module
+-----------------------------------
+
+Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
+
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
+
+Below are the details.
+
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
+dependency like this (replace '${ignite.version}' with actual Ignite version you are
+interested in):
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-kafka-ext</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+
+
+## Streaming Data to Ignite via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# converter (optional)
+singleTupleExtractorCls=my.company.MyTupleExtractor
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+If you need to create an Ignite key from a Kafka value, implement StreamSingleTupleExtractor and specify it as 'singleTupleExtractorCls'.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```
+
+## Streaming Cache Event Data to Kafka via Kafka Connect
+
+Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
+using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=ignite-src-connector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
+tasks.max=2
+
+# cache
+topicNames=testTopic1,testTopic2
+cacheEvts=put,removed
+## if you decide to filter remotely (recommended)
+cacheFilterCls=MyFilter
+cacheName=cache1
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
+used to safely transfer data from Ignite cache to Kafka.
+
+The following cache events can be specified in the connector configurations:
+- CREATED
+- DESTROYED
+- PUT
+- READ
+- REMOVED
+- LOCKED
+- UNLOCKED
+- SWAPPED
+- UNSWAPPED
+- EXPIRED
+
+For a simple cache configuration file example, see example-ignite.xml in tests.
+
+4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
diff --git a/modules/kafka-ext/licenses/apache-2.0.txt b/modules/kafka-ext/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/kafka-ext/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml b/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100755
index 0000000..b78fa9c
--- /dev/null
+++ b/modules/kafka-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+    "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<!--
+    Log4j configuration.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+    <!--
+        Logs System.out messages to console.
+    -->
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <!-- Log to STDOUT. -->
+        <param name="Target" value="System.out"/>
+
+        <!-- Log from DEBUG and higher. -->
+        <param name="Threshold" value="DEBUG"/>
+
+        <!-- The default pattern: Date Priority [Category] Message\n -->
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+
+        <!-- Do not log beyond INFO level. -->
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="levelMin" value="DEBUG"/>
+            <param name="levelMax" value="INFO"/>
+        </filter>
+    </appender>
+
+    <!--
+        Logs all System.err messages to console.
+    -->
+    <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+        <!-- Log to STDERR. -->
+        <param name="Target" value="System.err"/>
+
+        <!-- Log from WARN and higher. -->
+        <param name="Threshold" value="WARN"/>
+
+        <!-- The default pattern: Date Priority [Category] Message\n -->
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+
+    <!--
+        Logs all output to specified file.
+    -->
+    <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+        <param name="Threshold" value="DEBUG"/>
+        <param name="File" value="${IGNITE_HOME}/work/log/ignite.log"/>
+        <param name="Append" value="true"/>
+        <param name="MaxFileSize" value="10MB"/>
+        <param name="MaxBackupIndex" value="10"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+
+    <!-- Disable all open source debugging. -->
+    <category name="org">
+        <level value="INFO"/>
+    </category>
+
+    <category name="org.eclipse.jetty">
+        <level value="INFO"/>
+    </category>
+
+    <!-- Default settings. -->
+    <root>
+        <!-- Print at info by default. -->
+        <level value="INFO"/>
+
+        <!-- Append to file and console. -->
+        <appender-ref ref="FILE"/>
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="CONSOLE_ERR"/>
+    </root>
+</log4j:configuration>
diff --git a/modules/kafka-ext/modules/core/src/test/config/tests.properties b/modules/kafka-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..18b3606
--- /dev/null
+++ b/modules/kafka-ext/modules/core/src/test/config/tests.properties
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Local address to bind to.
+local.ip=127.0.0.1
+
+# TCP communication port
+comm.tcp.port=30010
+
+# JBoss JNDI
+# JBoss context factory for JNDI connection establishing.
+jboss.jndi.context.factory=org.jnp.interfaces.NamingContextFactory
+# JBoss specific parameter for JNDI connection establishing.
+jboss.jndi.pkg.prefixes=org.jboss.naming:org.jnp.interfaces
+# URL of JBoss server for the 1st node.
+jboss.jndi.node1.provider.url=jnp://localhost:1199
+# URL of JBoss server for the 2nd node.
+jboss.jndi.node2.provider.url=jnp://localhost:1299
+# JBoss Discovery test max wait time.
+jboss.disco.test.wait=180000
+
+# Deployment configuration paths.
+# You will either need to override deploy.uri.dir or supply CLASSES_URI as system property.
+#
+# Path to keystore with private and public keys.
+deploy.uri.secure.keystore=@{IGNITE_HOME}/modules/tests/config/securedeploy/keystore
+# Temporary dir where deployment unit stored before deploy.
+deploy.uri.tmpdir=${java.io.tmpdir}/gg
+# Deployment dir for file scanner test with different types of GAR's.
+deploy.uri.file2.path=${java.io.tmpdir}/gg/verification/
+# URI string.
+deploy.uri.file2=file://freq=200@localhost/${java.io.tmpdir}/gg/verification/
+# File scanner URI for local file deployment.
+deploy.uri.file=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/file/
+# FTP scanner URI for FTP deployment.
+deploy.uri.ftp=ftp://ftptest:iddqd@94.72.60.102:21/test/deployment
+# Classes scanner URI for classes deployment. Must be overridden for every user.
+deploy.uri.cls=${CLASSES_URI}
+# Http scanner URI for HTTP deployment.
+deploy.uri.http=http://fake.uri
+# Http scanner URI for secure SSL HTTPs deployment.
+deploy.uri.https=https://fake.uri
+# Directory with descriptors to construct GAR files.
+deploy.gar.descriptor.dir=modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/META-INF
+
+# Directory with a number of descriptors for the Ant gar task.
+ant.gar.descriptor.dir=modules/extdata/p2p/META-INF
+# Temporary directory for the Ant task resulting GAR file.
+ant.gar.tmpdir=${java.io.tmpdir}/gg
+# The same as p2p.uri.cls but without protocol
+ant.gar.srcdir=@{IGNITE_HOME}/modules/extdata/uri/target/classes/
+
+# Paths to use in URI deployment SPI tests
+urideployment.jar.uri=modules/extdata/uri/target/deploy/uri.jar
+urideployment.path.tmp=modules/extdata/uri/target/deploy_tmp/
+
+# GAR paths to use in URI deployment SPI tests
+ant.urideployment.gar.uri=file://freq=5000@localhost/EXTDATA/uri/target/deploy
+ant.urideployment.gar.file=modules/extdata/uri/target/deploy/uri.gar
+ant.urideployment.gar.libs-file=modules/extdata/uri/target/deploy2/uri-libs.gar
+ant.urideployment.gar.classes-file=modules/extdata/uri/target/deploy2/uri-classes.gar
+ant.urideployment.gar.path=modules/extdata/uri/target/deploy/
+
+# Classpath directory for GridP2PUserVersionChangeSelfTest
+ant.userversion.class.dir=@{IGNITE_HOME}/modules/tests/java/
+
+# Multicast discovery self test.
+discovery.mbeanserver.selftest.baseport=50000
+
+# TCP communication self test.
+comm.mbeanserver.selftest.baseport=50100
+
+# Kernel tests.
+grid.comm.selftest.sender.timeout=1000
+grid.comm.selftest.timeout=10000
+
+#P2P tests
+#Overwrite this property. It should point to P2P module compilation directory.
+p2p.uri.cls=file://localhost/@{IGNITE_HOME}/modules/extdata/p2p/target/classes/
+p2p.uri.cls.second=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/classes/
+
+# AOP tests.
+# Connector port for RMI.
+connector.rmi.port=7657
+# Connector port for XFire Web Service.
+connector.ws.port=9090
+
+# Load test duration in minutes.
+load.test.duration=500
+load.test.threadnum=50
+load.test.nodenum=5
+
+# Loaders tests
+loader.self.test.config=modules/core/src/test/config/loaders/grid-cfg.xml
+loader.self.multipletest.config=modules/core/src/test/config/loaders/grid-cfg-2-grids.xml
+loader.self.test.jboss.config=modules/core/src/test/config/loaders/grid-cfg.xml
+
+# WebSphere jmx properties
+websphere.jmx.connector.host=localhost
+websphere.jmx.connector.port=8880
+websphere.jmx.connector.security=false
+websphere.jmx.username=
+websphere.jmx.pwd=
+
+# GlassFish jmx properties for GlassFish Loader
+glassfish.jmx.rmi.connector.port=8686
+glassfish.jmx.username=admin
+glassfish.jmx.password=adminadmin
+
+# Tomcat jmx properties for Servlet Loader
+tomcat.jmx.rmi.connector.port=1097
+
+# Marshaller for tests
+#marshaller.class=org.apache.ignite.marshaller.jdk.GridJdkMarshaller
+
+# EC2 configuration for tests
+#amazon.access.key=
+#amazon.secret.key=
+
+# SSH config.
+ssh.username=uname
+ssh.password=passwd
+
+# SSL tests keystore.
+ssl.keystore.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/server.jks
+ssl.keystore.password=123456
+
+# node01 signed with trust-one, node02 and node03 by trust-two, node02old is expired
+# trust-both contains both CAs
+ssl.keystore.node01.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node01.jks
+ssl.keystore.node02.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02.jks
+ssl.keystore.node03.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node03.jks
+ssl.keystore.trustone.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-one.jks
+ssl.keystore.trusttwo.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-two.jks
+ssl.keystore.trustboth.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-both.jks
+ssl.keystore.node02old.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02old.jks
+
+# Hadoop home directory.
+hadoop.home=@{HADOOP_HOME}
diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml
new file mode 100644
index 0000000..115d767
--- /dev/null
+++ b/modules/kafka-ext/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-extensions-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-kafka-ext</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>${zookeeper.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+            <!-- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/143 -->
+            <exclusions>
+                <exclusion>
+                    <artifactId>guava</artifactId>
+                    <groupId>com.google.guava</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>${easymock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-tools</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Generate the OSGi MANIFEST.MF for this bundle. -->
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
new file mode 100644
index 0000000..02dbedd
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into
+ * {@link IgniteDataStreamer} instance.
+ * <p>
+ * Uses Kafka's High Level Consumer API to read messages from Kafka.
+ */
+public class KafkaStreamer<K, V> extends StreamAdapter<ConsumerRecord, K, V> {
+    /** Default polling timeout. */
+    private static final long DFLT_TIMEOUT = 100;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Polling tasks executor. */
+    private ExecutorService executor;
+
+    /** Topics. */
+    private List<String> topics;
+
+    /** Number of threads. */
+    private int threads;
+
+    /** Kafka consumer config. */
+    private Properties consumerCfg;
+
+    /** Polling timeout. */
+    private long timeout = DFLT_TIMEOUT;
+
+    /** Kafka consumer tasks. */
+    private final List<ConsumerTask> consumerTasks = new ArrayList<>();
+
+    /**
+     * Sets the topic names.
+     *
+     * @param topics Topic names.
+     */
+    public void setTopic(List<String> topics) {
+        this.topics = topics;
+    }
+
+    /**
+     * Sets the threads.
+     *
+     * @param threads Number of threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets the consumer config.
+     *
+     * @param consumerCfg Consumer configuration.
+     */
+    public void setConsumerConfig(Properties consumerCfg) {
+        this.consumerCfg = consumerCfg;
+    }
+
+    /**
+     * Sets the polling timeout for Kafka tasks.
+     *
+     * @param timeout Timeout.
+     */
+    public void setTimeout(long timeout) {
+        A.ensure(timeout > 0, "timeout > 0");
+
+        this.timeout = timeout;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.notNull(topics, "topics");
+        A.notNull(consumerCfg, "kafka consumer config");
+        A.ensure(threads > 0, "threads > 0");
+        A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(),
+            "Extractor must be configured");
+
+        log = getIgnite().log();
+
+        executor = Executors.newFixedThreadPool(threads);
+
+        IntStream.range(0, threads).forEach(i -> consumerTasks.add(new ConsumerTask(consumerCfg)));
+
+        for (ConsumerTask task : consumerTasks)
+            executor.submit(task);
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        for (ConsumerTask task : consumerTasks)
+            task.stop();
+
+        if (executor != null) {
+            executor.shutdown();
+
+            try {
+                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+                    if (log.isDebugEnabled())
+                        log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
+            }
+            catch (InterruptedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Interrupted during shutdown, exiting uncleanly.");
+            }
+        }
+    }
+
+    /** Polling task. */
+    class ConsumerTask implements Callable<Void> {
+        /** Kafka consumer. */
+        private final KafkaConsumer<?, ?> consumer;
+
+        /** Stopped. */
+        private volatile boolean stopped;
+
+        /** Constructor. */
+        public ConsumerTask(Properties consumerCfg) {
+            this.consumer = new KafkaConsumer<>(consumerCfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            consumer.subscribe(topics);
+
+            try {
+                while (!stopped) {
+                    for (ConsumerRecord record : consumer.poll(timeout)) {
+                        try {
+                            addMessage(record);
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Record is ignored due to an error [record = " + record + ']', e);
+                        }
+                    }
+                }
+            }
+            catch (WakeupException we) {
+                if (log.isInfoEnabled())
+                    log.info("Consumer is being stopped.");
+            }
+            catch (KafkaException ke) {
+                log.error("Kafka error", ke);
+            }
+            finally {
+                consumer.close();
+            }
+
+            return null;
+        }
+
+        /** Stops the polling task. */
+        public void stop() {
+            stopped = true;
+
+            if (consumer != null)
+                consumer.wakeup();
+        }
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..3fbfd9c
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+    /** Sink properties. */
+    private Map<String, String> configProps;
+
+    /** Expected configurations. */
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /**
+     * A sink lifecycle method. Validates grid-specific sink properties.
+     *
+     * @param props Sink properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        configProps = props;
+
+        try {
+            A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+        }
+    }
+
+    /**
+     * Obtains a sink task class to be instantiated for feeding data into grid.
+     *
+     * @return IgniteSinkTask class.
+     */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSinkTask.class;
+    }
+
+    /**
+     * Builds each config for <tt>maxTasks</tt> tasks.
+     *
+     * @param maxTasks Max number of tasks.
+     * @return Task configs.
+     */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..3fb511a
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Flag to enable overwriting existing values in cache. */
+    public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+    /** Size of per-node buffer before data is sent to remote node. */
+    public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+    /** Maximum number of parallel stream operations per node. */
+    public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+
+    /** Class to transform the entry before feeding into cache. */
+    public static final String SINGLE_TUPLE_EXTRACTOR_CLASS = "singleTupleExtractorCls";
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..184f309
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteConfigFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** Entry transformer. */
+    private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Initializes grid client from configPath.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        // Each task has the same parameters -- avoid setting more than once.
+        if (cacheName != null)
+            return;
+
+        cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+        igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+            StreamerContext.getStreamer().allowOverwrite(
+                Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+            StreamerContext.getStreamer().perNodeBufferSize(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+            StreamerContext.getStreamer().perNodeParallelOperations(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+        if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
+            String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+            if (transformerCls != null && !transformerCls.isEmpty()) {
+                try {
+                    Class<? extends StreamSingleTupleExtractor> clazz =
+                        (Class<? extends StreamSingleTupleExtractor<SinkRecord, Object, Object>>)
+                            Class.forName(transformerCls);
+
+                    extractor = clazz.newInstance();
+                }
+                catch (Exception e) {
+                    throw new ConnectException("Failed to instantiate the provided transformer!", e);
+                }
+            }
+        }
+
+        stopped = false;
+    }
+
+    /**
+     * Buffers records.
+     *
+     * @param records Records to inject into grid.
+     */
+    @SuppressWarnings("unchecked")
+    @Override public void put(Collection<SinkRecord> records) {
+        try {
+            for (SinkRecord record : records) {
+                // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+                if (extractor != null) {
+                    Map.Entry<Object, Object> entry = extractor.extract(record);
+                    StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
+                }
+                else {
+                    if (record.key() != null) {
+                        StreamerContext.getStreamer().addData(record.key(), record.value());
+                    }
+                    else {
+                        log.error("Failed to stream a record with null key!");
+                    }
+                }
+            }
+        }
+        catch (ConnectException e) {
+            log.error("Failed adding record", e);
+
+            throw new ConnectException(e);
+        }
+    }
+
+    /**
+     * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+     *
+     * @param offsets Offset information.
+     */
+    @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (stopped)
+            return;
+
+        StreamerContext.getStreamer().flush();
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        StreamerContext.getIgnite().close();
+    }
+
+    /**
+     * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+     *
+     * @param stopped Stopped flag.
+     */
+    protected static void setStopped(boolean stopped) {
+        IgniteSinkTask.stopped = stopped;
+
+        extractor = null;
+    }
+
+    /**
+     * Streamer context initializing grid and data streamer instances on demand.
+     */
+    public static class StreamerContext {
+        /** Constructor. */
+        private StreamerContext() {
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+
+            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        public static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+
+        /**
+         * Obtains data streamer instance.
+         *
+         * @return Data streamer instance.
+         */
+        public static IgniteDataStreamer getStreamer() {
+            return Holder.STREAMER;
+        }
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
new file mode 100644
index 0000000..986888e
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+/**
+ * Source connector to manage source tasks that listens to registered Ignite grid events and forward them to Kafka.
+ *
+ * Note that only cache events are enabled for streaming.
+ */
+public class IgniteSourceConnector extends SourceConnector {
+    /** Source properties. */
+    private Map<String, String> configProps;
+
+    /** Expected configurations. */
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(Map<String, String> props) {
+        try {
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
+        }
+
+        configProps = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTask.class;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
new file mode 100644
index 0000000..7d590e5
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSourceConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Events to be listened to. Names corresponds to {@link IgniteSourceTask.CacheEvt}. */
+    public static final String CACHE_EVENTS = "cacheEvts";
+
+    /** Internal buffer size. */
+    public static final String INTL_BUF_SIZE = "evtBufferSize";
+
+    /** Size of one chunk drained from the internal buffer. */
+    public static final String INTL_BATCH_SIZE = "evtBatchSize";
+
+    /** User-defined filter class. */
+    public static final String CACHE_FILTER_CLASS = "cacheFilterCls";
+
+    /** Kafka topic. */
+    public static final String TOPIC_NAMES = "topicNames";
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
new file mode 100644
index 0000000..6e3d43b
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume remote cluster cache events from the grid and inject them into Kafka.
+ * <p>
+ * Note that a task will create a bounded queue in the grid for more reliable data transfer.
+ * Queue size can be changed by {@link IgniteSourceConstants#INTL_BUF_SIZE}.
+ */
+public class IgniteSourceTask extends SourceTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
+
+    /** Tasks static monitor. */
+    private static final Object lock = new Object();
+
+    /** Event buffer size. */
+    private static int evtBufSize = 100000;
+
+    /** Event buffer. */
+    private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(evtBufSize);
+
+    /** Max number of events taken from the buffer at once. */
+    private static int evtBatchSize = 100;
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteCfgFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** Remote Listener id. */
+    private static UUID rmtLsnrId;
+
+    /** Local listener. */
+    private static TaskLocalListener locLsnr = new TaskLocalListener();
+
+    /** User-defined filter. */
+    private static IgnitePredicate<CacheEvent> filter;
+
+    /** Topic. */
+    private static String topics[];
+
+    /** Offset. */
+    private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
+
+    /** Partition. */
+    private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Filtering is done remotely. Local listener buffers data for injection into Kafka.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        synchronized (lock) {
+            // Each task has the same parameters -- avoid setting more than once.
+            // Nothing to do if the task has been already started.
+            if (!stopped)
+                return;
+
+            cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
+            igniteCfgFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
+            topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
+
+            if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
+                evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
+
+            if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
+                evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
+
+            if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
+                String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
+                if (filterCls != null && !filterCls.isEmpty()) {
+                    try {
+                        Class<? extends IgnitePredicate<CacheEvent>> clazz =
+                            (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
+
+                        filter = clazz.newInstance();
+                    }
+                    catch (Exception e) {
+                        log.error("Failed to instantiate the provided filter! " +
+                            "User-enabled filtering is ignored!", e);
+                    }
+                }
+            }
+
+            TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName);
+
+            try {
+                int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
+
+                rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                    .remoteListen(locLsnr, rmtLsnr, evts);
+            }
+            catch (Exception e) {
+                log.error("Failed to register event listener!", e);
+
+                throw new ConnectException(e);
+            }
+            finally {
+                stopped = false;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(evtBatchSize);
+        ArrayList<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+        if (stopped)
+            return records;
+
+        try {
+            if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+                for (CacheEvent evt : evts) {
+                    // schema and keys are ignored.
+                    for (String topic : topics)
+                        records.add(new SourceRecord(srcPartition, offset, topic, null, evt));
+                }
+
+                return records;
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Error when polling event queue!", e);
+        }
+
+        // for shutdown.
+        return null;
+    }
+
+    /**
+     * Converts comma-delimited cache events strings to Ignite internal representation.
+     *
+     * @param evtPropsStr Comma-delimited cache event names.
+     * @return Ignite internal representation of cache events to be registered with the remote listener.
+     * @throws Exception If error.
+     */
+    private int[] cacheEvents(String evtPropsStr) throws Exception {
+        String[] evtStr = evtPropsStr.split("\\s*,\\s*");
+
+        if (evtStr.length == 0)
+            return EventType.EVTS_CACHE;
+
+        int[] evts = new int[evtStr.length];
+
+        try {
+            for (int i = 0; i < evtStr.length; i++)
+                evts[i] = CacheEvt.valueOf(evtStr[i].toUpperCase()).getId();
+        }
+        catch (Exception e) {
+            log.error("Failed to recognize the provided cache event!", e);
+
+            throw new Exception(e);
+        }
+        return evts;
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public synchronized void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        stopRemoteListen();
+
+        IgniteGrid.getIgnite().close();
+    }
+
+    /**
+     * Stops the remote listener.
+     */
+    protected void stopRemoteListen() {
+        if (rmtLsnrId != null)
+            IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                .stopRemoteListen(rmtLsnrId);
+
+        rmtLsnrId = null;
+    }
+
+    /**
+     * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+     *
+     * @param stopped Stopped flag.
+     */
+    protected static void setStopped(boolean stopped) {
+        IgniteSourceTask.stopped = stopped;
+    }
+
+    /**
+     * Local listener buffering cache events to be further sent to Kafka.
+     */
+    private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID id, CacheEvent evt) {
+            try {
+                if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
+                    log.error("Failed to buffer event {}", evt.name());
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * Remote filter.
+     */
+    private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+        /** */
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        /** Cache name. */
+        private final String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        TaskRemoteFilter(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(CacheEvent evt) {
+            Affinity<Object> affinity = ignite.affinity(cacheName);
+
+            if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) {
+                // Process this event. Ignored on backups.
+                if (filter != null && filter.apply(evt))
+                    return false;
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Grid instance initialized on demand.
+     */
+    private static class IgniteGrid {
+        /** Constructor. */
+        private IgniteGrid() {
+            // No-op.
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            /** */
+            private static final Ignite IGNITE = Ignition.start(igniteCfgFile);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        private static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+    }
+
+    /** Cache events available for listening. */
+    private enum CacheEvt {
+        /** */
+        CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
+        /** */
+        DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
+        /** */
+        PUT(EventType.EVT_CACHE_OBJECT_PUT),
+        /** */
+        READ(EventType.EVT_CACHE_OBJECT_READ),
+        /** */
+        REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
+        /** */
+        LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
+        /** */
+        UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
+        /** */
+        EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+        /** Internal Ignite event id. */
+        private final int id;
+
+        /**
+         * Constructor.
+         *
+         * @param id Internal Ignite event id.
+         */
+        CacheEvt(int id) {
+            this.id = id;
+        }
+
+        /**
+         * Gets Ignite event id.
+         *
+         * @return Ignite event id.
+         */
+        int getId() {
+            return id;
+        }
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java
new file mode 100644
index 0000000..f565c04
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer Connector.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
new file mode 100644
index 0000000..6a60bad
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * {@link CacheEvent} converter for Connect API.
+ */
+public class CacheEventConverter implements Converter {
+    private final CacheEventDeserializer deserializer = new CacheEventDeserializer();
+
+    private final CacheEventSerializer serializer = new CacheEventSerializer();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] fromConnectData(String topic, Schema schema, Object o) {
+        try {
+            return serializer.serialize(topic, (CacheEvent)o);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to byte[] due to a serialization error", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
+        CacheEvent evt;
+
+        try {
+            evt = deserializer.deserialize(topic, bytes);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
+        }
+
+        if (evt == null) {
+            return SchemaAndValue.NULL;
+        }
+        return new SchemaAndValue(null, evt);
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
new file mode 100644
index 0000000..3bcf5fb
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Deserializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventDeserializer implements Deserializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEvent deserialize(String topic, byte[] bytes) {
+        try {
+            return U.unmarshal(marsh, bytes, getClass().getClassLoader());
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to deserialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
new file mode 100644
index 0000000..bc09256
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Serializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventSerializer implements Serializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] serialize(String topic, CacheEvent event) {
+        try {
+            return U.marshal(marsh, event);
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to serialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java
new file mode 100644
index 0000000..726ab39
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer Serializer.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java
new file mode 100644
index 0000000..d668d30
--- /dev/null
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer.
+ */
+
+package org.apache.ignite.stream.kafka;
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
new file mode 100644
index 0000000..c88fba8
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
+import org.apache.ignite.stream.kafka.connect.IgniteSourceConnectorTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Apache Kafka streamers tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    // Kafka streamer.
+    KafkaIgniteStreamerSelfTest.class,
+
+    // Kafka streamers via Connect API.
+    IgniteSinkConnectorTest.class,
+    IgniteSourceConnectorTest.class
+})
+public class IgniteKafkaStreamerSelfTestSuite {
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
new file mode 100644
index 0000000..3f20d87
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests {@link KafkaStreamer}.
+ */
+public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
+    /** Embedded Kafka. */
+    private TestKafkaBroker embeddedBroker;
+
+    /** Count. */
+    private static final int CNT = 100;
+
+    /** Test topic. */
+    private static final String TOPIC_NAME = "page_visits";
+
+    /** Kafka partition. */
+    private static final int PARTITIONS = 4;
+
+    /** Kafka replication factor. */
+    private static final int REPLICATION_FACTOR = 1;
+
+    /** Topic message key prefix. */
+    private static final String KEY_PREFIX = "192.168.2.";
+
+    /** Topic message value URL. */
+    private static final String VALUE_URL = ",www.example.com,";
+
+    /** Constructor. */
+    public KafkaIgniteStreamerSelfTest() {
+        super(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+        embeddedBroker = new TestKafkaBroker();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid().cache(DEFAULT_CACHE_NAME).clear();
+
+        embeddedBroker.shutdown();
+    }
+
+    /**
+     * Tests Kafka streamer.
+     *
+     * @throws TimeoutException If timed out.
+     * @throws InterruptedException If interrupted.
+     */
+    @Test
+    public void testKafkaStreamer() throws TimeoutException, InterruptedException {
+        embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
+
+        Map<String, String> keyValMap = produceStream(TOPIC_NAME);
+
+        consumerStream(TOPIC_NAME, keyValMap);
+    }
+
+    /**
+     * Sends messages to Kafka.
+     *
+     * @param topic Topic name.
+     * @return Map of key value messages.
+     */
+    private Map<String, String> produceStream(String topic) {
+        // Generate random subnets.
+        List<Integer> subnet = new ArrayList<>();
+
+        for (int i = 1; i <= CNT; i++)
+            subnet.add(i);
+
+        Collections.shuffle(subnet);
+
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(CNT);
+
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String ip = KEY_PREFIX + subnet.get(evt);
+
+            String msg = runtime + VALUE_URL + ip;
+
+            messages.add(new ProducerRecord<>(topic, ip, msg));
+
+            keyValMap.put(ip, msg);
+        }
+
+        embeddedBroker.sendMessages(messages);
+
+        return keyValMap;
+    }
+
+    /**
+     * Consumes Kafka stream via Ignite.
+     *
+     * @param topic Topic name.
+     * @param keyValMap Expected key value map.
+     * @throws TimeoutException If timed out.
+     * @throws InterruptedException If interrupted.
+     */
+    private void consumerStream(String topic, Map<String, String> keyValMap)
+        throws TimeoutException, InterruptedException {
+        KafkaStreamer<String, String> kafkaStmr = null;
+
+        Ignite ignite = grid();
+
+        try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+            stmr.allowOverwrite(true);
+            stmr.autoFlushFrequency(10);
+
+            // Configure Kafka streamer.
+            kafkaStmr = new KafkaStreamer<>();
+
+            // Get the cache.
+            IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+            // Set Ignite instance.
+            kafkaStmr.setIgnite(ignite);
+
+            // Set data streamer instance.
+            kafkaStmr.setStreamer(stmr);
+
+            // Set the topic.
+            kafkaStmr.setTopic(Arrays.asList(topic));
+
+            // Set the number of threads.
+            kafkaStmr.setThreads(4);
+
+            // Set the consumer configuration.
+            kafkaStmr.setConsumerConfig(
+                createDefaultConsumerConfig(embeddedBroker.getBrokerAddress(), "groupX"));
+
+            kafkaStmr.setMultipleTupleExtractor(
+                record -> {
+                    Map<String, String> entries = new HashMap<>();
+
+                    try {
+                        String key = (String)record.key();
+                        String val = (String)record.value();
+
+                        // Convert the message into number of cache entries with same key or dynamic key from actual message.
+                        // For now using key as cache entry key and value as cache entry value - for test purpose.
+                        entries.put(key, val);
+                    }
+                    catch (Exception ex) {
+                        fail("Unexpected error." + ex);
+                    }
+
+                    return entries;
+                });
+
+            // Start kafka streamer.
+            kafkaStmr.start();
+
+            final CountDownLatch latch = new CountDownLatch(CNT);
+
+            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+                @IgniteInstanceResource
+                private Ignite ig;
+
+                @LoggerResource
+                private IgniteLogger log;
+
+                /** {@inheritDoc} */
+                @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                    latch.countDown();
+
+                    if (log.isInfoEnabled()) {
+                        IgniteEx igEx = (IgniteEx)ig;
+
+                        UUID nodeId = igEx.localNode().id();
+
+                        log.info("Recive event=" + evt + ", nodeId=" + nodeId);
+                    }
+
+                    return true;
+                }
+            };
+
+            ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+            // Checks all events successfully processed in 10 seconds.
+            assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
+                latch.await(10, TimeUnit.SECONDS));
+
+            for (Map.Entry<String, String> entry : keyValMap.entrySet())
+                assertEquals(entry.getValue(), cache.get(entry.getKey()));
+        }
+        finally {
+            if (kafkaStmr != null)
+                kafkaStmr.stop();
+        }
+    }
+
+    /**
+     * Creates default consumer config.
+     *
+     * @param servers Bootstrap servers' address in the form of &lt;server:port;server:port&gt;.
+     * @param grpId Group Id for kafka subscriber.
+     * @return Kafka consumer configuration.
+     */
+    private Properties createDefaultConsumerConfig(String servers, String grpId) {
+        A.notNull(servers, "bootstrap servers");
+        A.notNull(grpId, "groupId");
+
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, grpId);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        return props;
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..24dbfc8
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import kafka.zk.KafkaZkClient;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+    /** ZooKeeper connection timeout. */
+    private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+    /** ZooKeeper session timeout. */
+    private static final int ZK_SESSION_TIMEOUT = 6000;
+
+    /** ZooKeeper port. */
+    private static final int ZK_PORT = 21811;
+
+    /** Broker host. */
+    private static final String BROKER_HOST = "localhost";
+
+    /** Broker port. */
+    private static final int BROKER_PORT = 11092;
+
+    /** Kafka config. */
+    private KafkaConfig kafkaCfg;
+
+    /** Kafka server. */
+    private KafkaServer kafkaSrv;
+
+    /** ZooKeeper. */
+    private TestingServer zkServer;
+
+    /** Kafka Zookeeper utils. */
+    private ZkUtils zkUtils;
+
+    /**
+     * Kafka broker constructor.
+     */
+    public TestKafkaBroker() {
+        try {
+            setupZooKeeper();
+
+            setupKafkaServer();
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to start Kafka: " + e);
+        }
+    }
+
+    /**
+     * Creates a topic.
+     *
+     * @param topic Topic name.
+     * @param partitions Number of partitions for the topic.
+     * @param replicationFactor Replication factor.
+     * @throws TimeoutException If operation is timed out.
+     * @throws InterruptedException If interrupted.
+     */
+    public void createTopic(String topic, int partitions, int replicationFactor)
+        throws TimeoutException, InterruptedException {
+        List<KafkaServer> servers = new ArrayList<>();
+
+        servers.add(kafkaSrv);
+
+        KafkaZkClient client = kafkaSrv.zkClient();
+
+        TestUtils.createTopic(client, topic, partitions, replicationFactor,
+            scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+    }
+
+    /**
+     * Sends a message to Kafka broker.
+     *
+     * @param records List of records.
+     * @return Producer used to send the message.
+     */
+    public void sendMessages(List<ProducerRecord<String, String>> records) {
+        Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
+
+        for (ProducerRecord<String, String> rec : records)
+            producer.send(rec);
+
+        producer.flush();
+        producer.close();
+    }
+
+    /**
+     * Shuts down test Kafka broker.
+     */
+    public void shutdown() {
+        if (zkUtils != null)
+            zkUtils.close();
+
+        if (kafkaSrv != null)
+            kafkaSrv.shutdown();
+
+        if (zkServer != null) {
+            try {
+                zkServer.stop();
+            }
+            catch (IOException ignored) {
+                // No-op.
+            }
+        }
+
+        List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+        for (String logDir : logDirs)
+            U.delete(new File(logDir));
+    }
+
+    /**
+     * Sets up test Kafka broker.
+     *
+     * @throws IOException If failed.
+     */
+    private void setupKafkaServer() throws IOException {
+        kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+        kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
+
+        kafkaSrv.startup();
+    }
+
+    /**
+     * Sets up ZooKeeper test server.
+     *
+     * @throws Exception If failed.
+     */
+    private void setupZooKeeper() throws Exception {
+        zkServer = new TestingServer(ZK_PORT, true);
+
+        Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+            ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+        zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+    }
+
+    /**
+     * Obtains Kafka config.
+     *
+     * @return Kafka config.
+     * @throws IOException If failed.
+     */
+    private Properties getKafkaConfig() throws IOException {
+        Properties props = new Properties();
+
+        props.put("broker.id", "0");
+        props.put("zookeeper.connect", zkServer.getConnectString());
+        props.put("host.name", BROKER_HOST);
+        props.put("port", BROKER_PORT);
+        props.put("offsets.topic.replication.factor", "1");
+        props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+        props.put("log.flush.interval.messages", "1");
+        props.put("log.flush.interval.ms", "10");
+
+        return props;
+    }
+
+    /**
+     * Obtains broker address.
+     *
+     * @return Kafka broker address.
+     */
+    public String getBrokerAddress() {
+        return BROKER_HOST + ":" + BROKER_PORT;
+    }
+
+    /**
+     * Obtains producer config.
+     *
+     * @return Kafka Producer config.
+     */
+    private Properties getProducerConfig() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress());
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+        return props;
+    }
+
+    /**
+     * Creates temporary directory.
+     *
+     * @param prefix Prefix.
+     * @return Created file.
+     * @throws IOException If failed.
+     */
+    private static File createTmpDir(String prefix) throws IOException {
+        Path path = Files.createTempDirectory(prefix);
+
+        return path.toFile();
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
new file mode 100644
index 0000000..4c912b9
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Sink connector mock for tests for using the task mock.
+ */
+public class IgniteSinkConnectorMock extends IgniteSinkConnector {
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSinkTaskMock.class;
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..8a70636
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.lang.reflect.Field;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 10000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics. */
+    private static final String[] TOPICS = {"sink-test1", "sink-test2"};
+
+    /** Kafka partition. */
+    private static final int PARTITIONS = 3;
+
+    /** Kafka replication factor. */
+    private static final int REPLICATION_FACTOR = 1;
+
+    /** Worker id. */
+    private static final String WORKER_ID = "workerId";
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node. */
+    private static Ignite grid;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        kafkaBroker = new TestKafkaBroker();
+
+        for (String topic : TOPICS)
+            kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+        Map<String, String> props = makeWorkerProps();
+        WorkerConfig workerCfg = new StandaloneConfig(props);
+
+        OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
+        offBackingStore.configure(workerCfg);
+
+        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        grid.cache(CACHE_NAME).removeAll();
+
+        // reset cache name to overwrite task configurations.
+        Field field = IgniteSinkTask.class.getDeclaredField("cacheName");
+
+        field.setAccessible(true);
+        field.set(IgniteSinkTask.class, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testSinkPutsWithoutTransformation() throws Exception {
+        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+        sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+
+        testSinkPuts(sinkProps, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testSinkPutsWithTransformation() throws Exception {
+        testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+    }
+
+    /**
+     * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+     * specified Kafka topics, because a sink task can read from multiple topics.
+     *
+     * @param sinkProps Sink properties.
+     * @param keyless Tests on Kafka stream with null keys if true.
+     * @throws Exception Thrown in case of the failure.
+     */
+    private void testSinkPuts(Map<String, String> sinkProps, boolean keyless) throws Exception {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!");
+            }
+        });
+
+        herder.putConnectorConfig(
+            sinkProps.get(ConnectorConfig.NAME_CONFIG),
+            sinkProps, false, cb);
+
+        cb.get();
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+        final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+        // Produces events for the specified number of topics
+        for (String topic : TOPICS)
+            keyValMap.putAll(produceStream(topic, keyless));
+
+        // Checks all events successfully processed in 10 seconds.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+        // Checks that each event was processed properly.
+        for (Map.Entry<String, String> entry : keyValMap.entrySet())
+            assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+        assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+    }
+
+    /**
+     * Sends messages to Kafka.
+     *
+     * @param topic Topic name.
+     * @param keyless Indicates whether a Kafka key is specified.
+     * @return Map of key value messages.
+     */
+    private Map<String, String> produceStream(String topic, boolean keyless) {
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = null;
+            if (!keyless)
+                key = topic + ":" + String.valueOf(evt);
+
+            String msg = topic + ":" + String.valueOf(evt) + "_" + runtime;
+
+            messages.add(new ProducerRecord<>(topic, key, msg));
+
+            if (!keyless)
+                keyValMap.put(key, msg);
+            else
+                keyValMap.put(topic + ":" + String.valueOf(evt), String.valueOf(runtime));
+        }
+
+        kafkaBroker.sendMessages(messages);
+
+        return keyValMap;
+    }
+
+    /**
+     * Creates properties for test sink connector.
+     *
+     * @param topics Topics.
+     * @return Test sink connector properties.
+     */
+    private Map<String, String> makeSinkProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(SinkConnector.TOPICS_CONFIG, topics);
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnectorMock.class.getName());
+        props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+        props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+        props.put(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS,
+            "org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest$TestExtractor");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     */
+    private Map<String, String> makeWorkerProps() {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+
+    /**
+     * Test transformer.
+     */
+    static class TestExtractor implements StreamSingleTupleExtractor<SinkRecord, String, String> {
+        /** {@inheritDoc} */
+        @Override public Map.Entry<String, String> extract(SinkRecord msg) {
+            String[] parts = ((String)msg.value()).split("_");
+            return new AbstractMap.SimpleEntry<String, String>(parts[0], parts[1]);
+        }
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
new file mode 100644
index 0000000..58a59b5
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSinkTaskMock extends IgniteSinkTask {
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // Don't stop the grid for tests.
+        setStopped(true);
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
new file mode 100644
index 0000000..0157a17
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Source connector mock for tests for using the task mock.
+ */
+public class IgniteSourceConnectorMock extends IgniteSourceConnector {
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTaskMock.class;
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
new file mode 100644
index 0000000..a1109e7
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests for {@link IgniteSourceConnector}.
+ */
+public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 100;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics created by connector. */
+    private static final String[] TOPICS = {"src-test1", "src-test2"};
+
+    /** Worker id. */
+    private static final String WORKER_ID = "workerId";
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node shared among tests. */
+    private static Ignite grid;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        kafkaBroker = new TestKafkaBroker();
+
+        Map<String, String> props = makeWorkerProps();
+        WorkerConfig workerCfg = new StandaloneConfig(props);
+
+        MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+        offBackingStore.configure(workerCfg);
+
+        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        grid.cache(CACHE_NAME).clear();
+
+        // reset cache name to overwrite task configurations.
+        Field field = IgniteSourceTask.class.getDeclaredField("cacheName");
+
+        field.setAccessible(true);
+        field.set(IgniteSourceTask.class, null);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster
+     * without user-specified filter.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    @Test
+    public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
+        Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, ","));
+
+        srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
+
+        doTest(srcProps, false);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    @Test
+    public void testEventsInjectedIntoKafka() throws Exception {
+        doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+    }
+
+    /**
+     * Tests the source with the specified source configurations.
+     *
+     * @param srcProps Source properties.
+     * @param conditioned Flag indicating whether filtering is enabled.
+     * @throws Exception Fails if error.
+     */
+    private void doTest(Map<String, String> srcProps, boolean conditioned) throws Exception {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!", error);
+            }
+        });
+
+        herder.putConnectorConfig(
+            srcProps.get(ConnectorConfig.NAME_CONFIG),
+            srcProps, true, cb);
+
+        cb.get();
+
+        // Ugh! To be sure Kafka Connect's worker thread is properly started...
+        Thread.sleep(5000);
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+        final IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
+            @Override public boolean apply(CacheEvent evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT);
+
+        keyValMap.putAll(sendData());
+
+        // Checks all events are processed.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(locLsnr);
+
+        assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+
+        // Checks the events are transferred to Kafka broker.
+        if (conditioned)
+            checkDataDelivered(EVENT_CNT * TOPICS.length / 2);
+        else
+            checkDataDelivered(EVENT_CNT * TOPICS.length);
+
+    }
+
+    /**
+     * Sends messages to the grid.
+     *
+     * @return Map of key value messages.
+     * @throws IOException If failed.
+     */
+    private Map<String, String> sendData() throws IOException {
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = "test_" + String.valueOf(evt);
+            String msg = runtime + String.valueOf(evt);
+
+            if (evt >= EVENT_CNT / 2)
+                key = "conditioned_" + key;
+
+            grid.cache(CACHE_NAME).put(key, msg);
+
+            keyValMap.put(key, msg);
+        }
+
+        return keyValMap;
+    }
+
+    /**
+     * Checks if events were delivered to Kafka server.
+     *
+     * @param expectedEventsCnt Expected events count.
+     * @throws Exception If failed.
+     */
+    private void checkDataDelivered(final int expectedEventsCnt) throws Exception {
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer");
+
+        final KafkaConsumer<String, CacheEvent> consumer = new KafkaConsumer<>(props);
+
+        consumer.subscribe(Arrays.asList(TOPICS));
+
+        final AtomicInteger evtCnt = new AtomicInteger();
+
+        try {
+            // Wait for expected events count.
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+                    for (ConsumerRecord<String, CacheEvent> record : records) {
+                        info("Record: " + record);
+
+                        evtCnt.getAndIncrement();
+                    }
+                    return evtCnt.get() >= expectedEventsCnt;
+                }
+            }, 20_000);
+
+            info("Waiting for unexpected records for 5 secs.");
+
+            assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+                    for (ConsumerRecord<String, CacheEvent> record : records) {
+                        error("Unexpected record: " + record);
+
+                        evtCnt.getAndIncrement();
+                    }
+                    return evtCnt.get() > expectedEventsCnt;
+                }
+            }, 5_000));
+        }
+        catch (WakeupException ignored) {
+            // ignore for shutdown.
+        }
+        finally {
+            consumer.close();
+
+            assertEquals(expectedEventsCnt, evtCnt.get());
+        }
+    }
+
+    /**
+     * Creates properties for test source connector.
+     *
+     * @param topics Topics.
+     * @return Test source connector properties.
+     */
+    private Map<String, String> makeSourceProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
+        props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
+        props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
+        props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
+        props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
+        props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     * @throws IOException If failed.
+     */
+    private Map<String, String> makeWorkerProps() throws IOException {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
new file mode 100644
index 0000000..4535ad5
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Source task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSourceTaskMock extends IgniteSourceTask {
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // Don't stop the grid for tests.
+        stopRemoteListen();
+
+        setStopped(true);
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
new file mode 100644
index 0000000..bf28a2b
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Test user-defined filter.
+ */
+class TestCacheEventFilter implements IgnitePredicate<CacheEvent> {
+    /** {@inheritDoc} */
+    @Override public boolean apply(CacheEvent evt) {
+        return ((String)evt.key()).startsWith("conditioned_");
+    }
+}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java
new file mode 100644
index 0000000..d668d30
--- /dev/null
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Contains implementation of Kafka Streamer.
+ */
+
+package org.apache.ignite.stream.kafka;
diff --git a/modules/kafka-ext/src/test/resources/example-ignite.xml b/modules/kafka-ext/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..f23a306
--- /dev/null
+++ b/modules/kafka-ext/src/test/resources/example-ignite.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+    Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable peer class loading for remote events. -->
+        <property name="peerClassLoadingEnabled" value="true"/>
+        <!-- Enable client mode. -->
+        <property name="clientMode" value="true"/>
+
+        <!-- Cache accessed from IgniteSink. -->
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="name" value="testCache"/>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Enable cache events. -->
+        <property name="includeEventTypes">
+            <list>
+                <!-- Cache events. -->
+                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>
diff --git a/pom.xml b/pom.xml
index 2aa8af9..55d3af5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
         <module>modules/storm-ext</module>
         <module>modules/camel-ext</module>
         <module>modules/jms11-ext</module>
+        <module>modules/kafka-ext</module>
     </modules>
 
     <profiles>