RYA-487 Closes #296, Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
index ed76b4a..cbfe2ea 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
@@ -62,14 +62,14 @@
         super();
     }
 
-    public AccumuloRdfConfiguration(Configuration other) {
+    public AccumuloRdfConfiguration(final Configuration other) {
         super(other);
     }
 
-    public AccumuloRdfConfigurationBuilder getBuilder() {
+    public static AccumuloRdfConfigurationBuilder getBuilder() {
     	return new AccumuloRdfConfigurationBuilder();
     }
-    
+
     /**
      * Creates an AccumuloRdfConfiguration object from a Properties file.  This method assumes
      * that all values in the Properties file are Strings and that the Properties file uses the keys below.
@@ -94,26 +94,26 @@
      * @param props - Properties file containing Accumulo specific configuration parameters
      * @return AccumumuloRdfConfiguration with properties set
      */
-    
-    public static AccumuloRdfConfiguration fromProperties(Properties props) {
+
+    public static AccumuloRdfConfiguration fromProperties(final Properties props) {
     	return AccumuloRdfConfigurationBuilder.fromProperties(props).build();
     }
-    
+
     @Override
     public AccumuloRdfConfiguration clone() {
         return new AccumuloRdfConfiguration(this);
     }
-    
+
     /**
      * Sets the Accumulo username from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloUser(String user) {
+    public void setAccumuloUser(final String user) {
     	Preconditions.checkNotNull(user);
     	set(CLOUDBASE_USER, user);
     }
-    
+
     /**
      * Get the Accumulo username from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
@@ -121,19 +121,19 @@
      * @return The username if one could be found; otherwise {@code null}.
      */
     public String getAccumuloUser(){
-    	return get(CLOUDBASE_USER); 
+    	return get(CLOUDBASE_USER);
     }
-    
+
     /**
      * Sets the Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloPassword(String password) {
+    public void setAccumuloPassword(final String password) {
     	Preconditions.checkNotNull(password);
     	set(CLOUDBASE_PASSWORD, password);
     }
-    
+
     /**
      * Get the Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
@@ -143,18 +143,18 @@
     public String getAccumuloPassword() {
     	return get(CLOUDBASE_PASSWORD);
     }
-    
+
     /**
      * Sets a comma delimited list of the names of the Zookeeper servers from
      * the configuration object that is meant to be used when connecting a
      * {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloZookeepers(String zookeepers) {
+    public void setAccumuloZookeepers(final String zookeepers) {
     	Preconditions.checkNotNull(zookeepers);
     	set(CLOUDBASE_ZOOKEEPERS, zookeepers);
     }
-    
+
     /**
      * Get a comma delimited list of the names of the Zookeeper servers from
      * the configuration object that is meant to be used when connecting a
@@ -165,17 +165,17 @@
     public String getAccumuloZookeepers() {
     	return get(CLOUDBASE_ZOOKEEPERS);
     }
-    
+
     /**
      * Sets the Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloInstance(String instance) {
+    public void setAccumuloInstance(final String instance) {
     	Preconditions.checkNotNull(instance);
     	set(CLOUDBASE_INSTANCE, instance);
     }
-    
+
     /**
      * Get the Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
@@ -185,15 +185,15 @@
     public String getAccumuloInstance() {
     	return get(CLOUDBASE_INSTANCE);
     }
-    
+
     /**
      * Tells the Rya instance to use a Mock instance of Accumulo as its backing.
      *
      */
-    public void setUseMockAccumulo(boolean useMock) {
+    public void setUseMockAccumulo(final boolean useMock) {
     	setBoolean(USE_MOCK_INSTANCE, useMock);
     }
-    
+
     /**
      * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
      *
@@ -202,12 +202,12 @@
     public boolean getUseMockAccumulo() {
     	return getBoolean(USE_MOCK_INSTANCE, false);
     }
-    
+
 
     /**
      * @param enabled - {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
      */
-    public void useMockInstance(boolean enabled) {
+    public void useMockInstance(final boolean enabled) {
         super.setBooleanIfUnset(USE_MOCK_INSTANCE, enabled);
     }
 
@@ -224,7 +224,7 @@
      * @param username - The Accumulo username from the configuration object that is meant to
      *   be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setUsername(String username) {
+    public void setUsername(final String username) {
         super.set(CLOUDBASE_USER, username);
     }
 
@@ -242,7 +242,7 @@
      * @param password - The Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setPassword(String password) {
+    public void setPassword(final String password) {
         super.set(CLOUDBASE_PASSWORD, password);
     }
 
@@ -260,7 +260,7 @@
      * @param instanceName - The Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setInstanceName(String instanceName) {
+    public void setInstanceName(final String instanceName) {
         super.set(CLOUDBASE_INSTANCE, instanceName);
     }
 
@@ -279,7 +279,7 @@
      * the configuration object that is meant to be used when connecting a
      * {@link Connector} to Accumulo.
      */
-    public void setZookeepers(String zookeepers) {
+    public void setZookeepers(final String zookeepers) {
         super.set(CLOUDBASE_ZOOKEEPERS, zookeepers);
     }
 
@@ -295,14 +295,14 @@
     }
 
     public Authorizations getAuthorizations() {
-        String[] auths = getAuths();
+        final String[] auths = getAuths();
         if (auths == null || auths.length == 0) {
             return AccumuloRdfConstants.ALL_AUTHORIZATIONS;
         }
         return new Authorizations(auths);
     }
 
-    public void setMaxRangesForScanner(Integer max) {
+    public void setMaxRangesForScanner(final Integer max) {
         setInt(MAXRANGES_SCANNER, max);
     }
 
@@ -310,9 +310,9 @@
         return getInt(MAXRANGES_SCANNER, 2);
     }
 
-    public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) {
-        List<String> strs = Lists.newArrayList();
-        for (Class<? extends AccumuloIndexer> ai : indexers){
+    public void setAdditionalIndexers(final Class<? extends AccumuloIndexer>... indexers) {
+        final List<String> strs = Lists.newArrayList();
+        for (final Class<? extends AccumuloIndexer> ai : indexers){
             strs.add(ai.getName());
         }
 
@@ -326,25 +326,25 @@
         return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
     }
 
-    public void setFlush(boolean flush){
+    public void setFlush(final boolean flush){
         setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
     }
 
-    public void setAdditionalIterators(IteratorSetting... additionalIterators){
+    public void setAdditionalIterators(final IteratorSetting... additionalIterators){
         //TODO do we need to worry about cleaning up
         this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length));
         int i = 0;
-        for(IteratorSetting iterator : additionalIterators) {
+        for(final IteratorSetting iterator : additionalIterators) {
             this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName());
             this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass());
             this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority()));
-            Map<String, String> options = iterator.getOptions();
+            final Map<String, String> options = iterator.getOptions();
 
             this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size()));
-            Iterator<Entry<String, String>> it = options.entrySet().iterator();
+            final Iterator<Entry<String, String>> it = options.entrySet().iterator();
             int j = 0;
             while(it.hasNext()) {
-                Entry<String, String> item = it.next();
+                final Entry<String, String> item = it.next();
                 this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey());
                 this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue());
                 j++;
@@ -354,22 +354,22 @@
     }
 
     public IteratorSetting[] getAdditionalIterators(){
-        int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
+        final int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
         if(size == 0) {
             return new IteratorSetting[0];
         }
 
-        IteratorSetting[] settings = new IteratorSetting[size];
+        final IteratorSetting[] settings = new IteratorSetting[size];
         for(int i = 0; i < size; i++) {
-            String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
-            String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
-            int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
+            final String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
+            final String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
+            final int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
 
-            int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
-            Map<String, String> options = new HashMap<>(optionsSize);
+            final int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
+            final Map<String, String> options = new HashMap<>(optionsSize);
             for(int j = 0; j < optionsSize; j++) {
-                String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
-                String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
+                final String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
+                final String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
                 options.put(key, value);
             }
             settings[i] = new IteratorSetting(priority, name, iteratorClass, options);
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index d49f2ee..b207d79 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -274,17 +274,17 @@
      * on their child subtrees.
      * @param value whether to use aggregation pipeline optimization.
      */
-    public void setUseAggregationPipeline(boolean value) {
+    public void setUseAggregationPipeline(final boolean value) {
         setBoolean(USE_AGGREGATION_PIPELINE, value);
     }
 
     @Override
     public List<Class<QueryOptimizer>> getOptimizers() {
-        List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+        final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
         if (getUseAggregationPipeline()) {
-            Class<?> cl = AggregationPipelineQueryOptimizer.class;
+            final Class<?> cl = AggregationPipelineQueryOptimizer.class;
             @SuppressWarnings("unchecked")
-            Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
+            final Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
             optimizers.add(optCl);
         }
         return optimizers;
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index d2fe58a..77c77cd 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -438,6 +438,9 @@
         return Optional.fromNullable(conf.get(FLUO_APP_NAME));
     }
 
+    public static void setUseMongo(final Configuration conf, final boolean useMongo) {
+        conf.setBoolean(USE_MONGO, useMongo);
+    }
 
     public static boolean getUseMongo(final Configuration conf) {
         return conf.getBoolean(USE_MONGO, false);
diff --git a/extras/kafka.connect/README.md b/extras/kafka.connect/README.md
new file mode 100644
index 0000000..03b63c2
--- /dev/null
+++ b/extras/kafka.connect/README.md
@@ -0,0 +1,22 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+The parent project for all Rya Kafka Connect work. All projects that are part 
+of that system must use this project's pom as their parent pom.
+
+For more information about the Rya's Kafka Connect support, see 
+[the manual](../rya.manual/src/site/markdown/kafka-connect-integration.md). 
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo-it/README.md b/extras/kafka.connect/accumulo-it/README.md
new file mode 100644
index 0000000..abcc12d
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/README.md
@@ -0,0 +1,19 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains integration tests that verify an Accumulo backed 
+implementation of the Rya Kafka Connect Sink is working properly.
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo-it/pom.xml b/extras/kafka.connect/accumulo-it/pom.xml
new file mode 100644
index 0000000..af088a9
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rya.kafka.connect.accumulo.it</artifactId>
+
+    <name>Apache Rya Kafka Connect - Accumulo Integration Tests</name>
+    <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description>
+    
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.accumulo</artifactId>
+        </dependency>
+        
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.accumulo</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
new file mode 100644
index 0000000..1775a74
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests for the methods of {@link AccumuloRyaSinkTask}.
+ */
+public class AccumuloRyaSinkTaskIT extends AccumuloITBase {
+
+    @Test
+    public void instanceExists() throws Exception {
+        // Install an instance of Rya.
+        final String ryaInstanceName = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+
+        final InstallConfiguration installConfig = InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(false)
+                .setEnableGeoIndex(false)
+                .build();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+        // Create the task that will be tested.
+        final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded accumulo instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers());
+            config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName());
+            config.put(AccumuloRyaSinkConfig.USERNAME, getUsername());
+            config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword());
+            config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, ryaInstanceName);
+
+            // This will pass because the Rya instance exists.
+            task.start(config);
+
+        } finally {
+            task.stop();
+        }
+    }
+
+    @Test(expected = ConnectException.class)
+    public void instanceDoesNotExist() throws Exception {
+        // Create the task that will be tested.
+        final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded accumulo instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers());
+            config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName());
+            config.put(AccumuloRyaSinkConfig.USERNAME, getUsername());
+            config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword());
+            config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, getRyaInstanceName());
+
+            // Staring the task will fail because the Rya instance does not exist.
+            task.start(config);
+
+        } finally {
+            task.stop();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/README.md b/extras/kafka.connect/accumulo/README.md
new file mode 100644
index 0000000..eecfd21
--- /dev/null
+++ b/extras/kafka.connect/accumulo/README.md
@@ -0,0 +1,23 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project is the Rya Kafka Connect Sink that writes to Accumulo backed 
+instances of Rya.
+
+This project produces a shaded jar that may be installed into Kafka Connect. 
+For more information about how to install and configure this connector, see
+[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md).
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/pom.xml b/extras/kafka.connect/accumulo/pom.xml
new file mode 100644
index 0000000..54188db
--- /dev/null
+++ b/extras/kafka.connect/accumulo/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rya.kafka.connect.accumulo</artifactId>
+
+    <name>Apache Rya Kafka Connect - Accumulo</name>
+    <description>A Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description>
+    
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Build the uber jar that may be deployed to Kafka Connect. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java
new file mode 100644
index 0000000..8db4f1c
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link AccumuloRyaSinkConnector}s
+ * and {@link AccumuloRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConfig extends RyaSinkConfig {
+
+    public static final String ZOOKEEPERS = "accumulo.zookeepers";
+    private static final String ZOOKEEPERS_DOC = "A comma delimited list of the Zookeeper server hostname/port pairs.";
+
+    public static final String CLUSTER_NAME = "accumulo.cluster.name";
+    private static final String CLUSTER_NAME_DOC = "The name of the Accumulo instance within Zookeeper.";
+
+    public static final String USERNAME = "accumulo.username";
+    private static final String USERNAME_DOC = "The Accumulo username the Sail connections will use.";
+
+    public static final String PASSWORD = "accumulo.password";
+    private static final String PASSWORD_DOC = "The Accumulo password the Sail connections will use.";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ZOOKEEPERS, Type.STRING, Importance.HIGH, ZOOKEEPERS_DOC)
+            .define(CLUSTER_NAME, Type.STRING, Importance.HIGH, CLUSTER_NAME_DOC)
+            .define(USERNAME, Type.STRING, Importance.HIGH, USERNAME_DOC)
+            .define(PASSWORD, Type.PASSWORD, Importance.HIGH, PASSWORD_DOC);
+    static {
+        RyaSinkConfig.addCommonDefinitions(CONFIG_DEF);
+    }
+
+    /**
+     * Constructs an instance of {@link AccumuloRyaSinkConfig}.
+     *
+     * @param originals - The key/value pairs that define the configuration. (not null)
+     */
+    public AccumuloRyaSinkConfig(final Map<?, ?> originals) {
+        super(CONFIG_DEF, requireNonNull(originals));
+    }
+
+    /**
+     * @return A comma delimited list of the Zookeeper server hostname/port pairs.
+     */
+    public String getZookeepers() {
+        return super.getString(ZOOKEEPERS);
+    }
+
+    /**
+     * @return The name of the Accumulo instance within Zookeeper.
+     */
+    public String getClusterName() {
+        return super.getString(CLUSTER_NAME);
+    }
+
+    /**
+     * @return The Accumulo username the Sail connections will use.
+     */
+    public String getUsername() {
+        return super.getString(USERNAME);
+    }
+
+    /**
+     * @return The Accumulo password the Sail connections will use.
+     */
+    public String getPassword() {
+        return super.getPassword(PASSWORD).value();
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
new file mode 100644
index 0000000..eeb3d75
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+    @Nullable
+    private AccumuloRyaSinkConfig config = null;
+
+    @Override
+    public void start(final Map<String, String> props) {
+        requireNonNull(props);
+        this.config = new AccumuloRyaSinkConfig( props );
+    }
+
+    @Override
+    protected AbstractConfig getConfig() {
+        if(config == null) {
+            throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first.");
+        }
+        return config;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return AccumuloRyaSinkTask.class;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return AccumuloRyaSinkConfig.CONFIG_DEF;
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
new file mode 100644
index 0000000..7d19f29
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.log.LogUtils;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+    @Override
+    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+        // Connect to the instance of Accumulo.
+        final Connector connector;
+        try {
+            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+        } catch (final AccumuloException | AccumuloSecurityException e) {
+            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+        }
+
+        // Use a RyaClient to see if the configured instance exists.
+        try {
+            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                    config.getUsername(),
+                    config.getPassword().toCharArray(),
+                    config.getClusterName(),
+                    config.getZookeepers());
+            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+                throw new ConnectException("The Rya Instance named " +
+                        LogUtils.clean(config.getRyaInstanceName()) + " has not been installed.");
+            }
+
+        } catch (final RyaClientException e) {
+            throw new ConnectException("Unable to determine if the Rya Instance named " +
+                    LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e);
+        }
+    }
+
+    @Override
+    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+        // Move the configuration into a Rya Configuration object.
+        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+        ryaConfig.setAccumuloInstance( config.getClusterName() );
+        ryaConfig.setAccumuloUser( config.getUsername() );
+        ryaConfig.setAccumuloPassword( config.getPassword() );
+
+        // Create the Sail object.
+        try {
+            return RyaSailFactory.getInstance(ryaConfig);
+        } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+            throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
new file mode 100644
index 0000000..66ecd87
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+    @Test
+    public void parses() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(AccumuloRyaSinkConfig.ZOOKEEPERS, "zoo1:2181,zoo2");
+        properties.put(AccumuloRyaSinkConfig.CLUSTER_NAME, "test");
+        properties.put(AccumuloRyaSinkConfig.USERNAME, "alice");
+        properties.put(AccumuloRyaSinkConfig.PASSWORD, "alice1234!@");
+        properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya_");
+        new AccumuloRyaSinkConfig(properties);
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/README.md b/extras/kafka.connect/api/README.md
new file mode 100644
index 0000000..777fd2a
--- /dev/null
+++ b/extras/kafka.connect/api/README.md
@@ -0,0 +1,20 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains the common components of a Rya Kafka Connect Sink. Each
+backend database that Rya is built on top of must have an implementation using
+this project's components.
\ No newline at end of file
diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml
new file mode 100644
index 0000000..3727394
--- /dev/null
+++ b/extras/kafka.connect/api/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.kafka.connect.api</artifactId>
+
+    <name>Apache Rya Kafka Connect - API</name>
+    <description>Contains common components used when implementing a Kafka Connect Sink
+                 that writes to a Rya instance.</description>
+
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api.model</artifactId>
+        </dependency>
+    
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-binary</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-datatypes</artifactId>
+        </dependency>        
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jcabi</groupId>
+            <artifactId>jcabi-manifests</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java
new file mode 100644
index 0000000..eb4b611
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.eclipse.rdf4j.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A plugin into the Kafka Connect platform that converts {@link Set}s of {@link Statement}s
+ * to/from byte[]s by using a {@link StatementsSerializer} and a {@link StatementsDeserializer}.
+ * <p/>
+ * This converter does not use Kafka's Schema Registry.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsConverter implements Converter {
+
+    private static final StatementsSerializer SERIALIZER = new StatementsSerializer();
+    private static final StatementsDeserializer DESERIALIZER = new StatementsDeserializer();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // This converter's behavior can not be tuned with configurations.
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
+        requireNonNull(value);
+        return SERIALIZER.serialize(topic, (Set<Statement>) value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+        requireNonNull(value);
+        return new SchemaAndValue(null, DESERIALIZER.deserialize(topic, value));
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
new file mode 100644
index 0000000..fb03347
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public Set<Statement> deserialize(final String topic, final byte[] data) {
+        if(data == null || data.length == 0) {
+            // Return null because that is the contract of this method.
+            return null;
+        }
+
+        try {
+            final RDFParser parser = PARSER_FACTORY.getParser();
+            final Set<Statement> statements = new HashSet<>();
+
+            parser.setRDFHandler(new AbstractRDFHandler() {
+                @Override
+                public void handleStatement(final Statement statement) throws RDFHandlerException {
+                    log.debug("Statement: " + statement);
+                    statements.add( statement );
+                }
+            });
+
+            parser.parse(new ByteArrayInputStream(data), null);
+            return statements;
+
+        } catch(final RDFParseException | RDFHandlerException | IOException e) {
+            log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binary format.", e);
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java
new file mode 100644
index 0000000..f2101d6
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a {@link Serializer} and {@link Deserializer} for {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerde implements Serde<Set<Statement>> {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public Serializer<Set<Statement>> serializer() {
+        return new StatementsSerializer();
+    }
+
+    @Override
+    public Deserializer<Set<Statement>> deserializer() {
+        return new StatementsDeserializer();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
new file mode 100644
index 0000000..893df0c
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer<Set<Statement>> {
+    private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
+
+    private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Set<Statement> data) {
+        if(data == null) {
+            // Returning null because that is the contract of this method.
+            return null;
+        }
+
+        // Write the statements using a Binary RDF Writer.
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final RDFWriter writer = WRITER_FACTORY.getWriter(baos);
+        writer.startRDF();
+
+        for(final Statement stmt : data) {
+            // Write the statement.
+            log.debug("Writing Statement: " + stmt);
+            writer.handleStatement(stmt);
+        }
+        writer.endRDF();
+
+        // Return the byte[] version of the data.
+        return baos.toByteArray();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java
new file mode 100644
index 0000000..5c3e2cc
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Contains common configuration fields for a Rya Sinks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaSinkConfig extends AbstractConfig {
+
+    public static final String RYA_INSTANCE_NAME = "rya.instance.name";
+    private static final String RYA_INSTANCE_NAME_DOC = "The name of the RYA instance that will be connected to.";
+
+    /**
+     * @param configDef - The configuration schema definition that will be updated to include
+     *   this configuration's fields. (not null)
+     */
+    public static void addCommonDefinitions(final ConfigDef configDef) {
+        requireNonNull(configDef);
+        configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC);
+    }
+
+    /**
+     * Constructs an instance of {@link RyaSinkConfig}.
+     *
+     * @param definition - Defines the schema of the configuration. (not null)
+     * @param originals - The key/value pairs that define the configuration. (not null)
+     */
+    public RyaSinkConfig(final ConfigDef definition, final Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    /**
+     * @return The name of the RYA instance that will be connected to.
+     */
+    public String getRyaInstanceName() {
+        return super.getString(RYA_INSTANCE_NAME);
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
new file mode 100644
index 0000000..f288af2
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
+ * </p>
+ * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+    /**
+     * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
+     * </p>
+     * Only called after start has been invoked
+     *
+     * @return The configuration object for the connector.
+     * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
+     */
+    protected abstract AbstractConfig getConfig() throws IllegalStateException;
+
+    @Override
+    public String version() {
+        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(final int maxTasks) {
+        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+        for(int i = 0; i < maxTasks; i++) {
+            configs.add( getConfig().originalsStrings() );
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since the RyaSinkConnector has no background monitoring.
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
new file mode 100644
index 0000000..5ff118a
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+    @Nullable
+    private SailRepository sailRepo = null;
+
+    @Nullable
+    private SailRepositoryConnection conn = null;
+
+    /**
+     * Throws an exception if the configured Rya Instance is not already installed
+     * within the configured database.
+     *
+     * @param taskConfig - The configuration values that were provided to the task. (not null)
+     * @throws ConnectException The configured Rya Instance is not installed to the configured database
+     *   or we were unable to figure out if it is installed.
+     */
+    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+    /**
+     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+     * Rya Instance.
+     *
+     * @param taskConfig - Configures how the Sail object will be created. (not null)
+     * @return The created Sail object.
+     * @throws ConnectException The Sail object could not be made.
+     */
+    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+    @Override
+    public String version() {
+        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+    }
+
+    @Override
+    public void start(final Map<String, String> props) throws ConnectException {
+        requireNonNull(props);
+
+        // Ensure the configured Rya Instance is installed within the configured database.
+        checkRyaInstanceExists(props);
+
+        // Create the Sail object that is connected to the Rya Instance.
+        final Sail sail = makeSail(props);
+        sailRepo = new SailRepository( sail );
+        conn = sailRepo.getConnection();
+    }
+
+    @Override
+    public void put(final Collection<SinkRecord> records) {
+        requireNonNull(records);
+
+        // Return immediately if there are no records to handle.
+        if(records.isEmpty()) {
+            return;
+        }
+
+        // If a transaction has not been started yet, then start one.
+        if(!conn.isActive()) {
+            conn.begin();
+        }
+
+        // Iterate through the records and write them to the Sail object.
+        for(final SinkRecord record : records) {
+            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+            conn.add((Set<? extends Statement>) record.value());
+        }
+    }
+
+    @Override
+    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+        requireNonNull(currentOffsets);
+        // Flush the current transaction.
+        conn.commit();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if(conn != null) {
+                conn.close();
+            }
+        } catch(final Exception e) {
+            log.error("Could not close the Sail Repository Connection.", e);
+        }
+
+        try {
+            if(sailRepo != null) {
+                sailRepo.shutDown();
+            }
+        } catch(final Exception e) {
+            log.error("Could not shut down the Sail Repository.", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java
new file mode 100644
index 0000000..01e5b76
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link StatementsSerde}.
+ */
+public class StatementsSerdeTest {
+
+    @Test
+    public void serializeAndDeserialize() {
+        // Create the object that will be serialized.
+        final ValueFactory vf = SimpleValueFactory.getInstance();
+
+        final Set<Statement> original = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:alice"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:testGraph")),
+                vf.createStatement(
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:graph2")),
+                vf.createStatement(
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:graph2")),
+                vf.createStatement(
+                        vf.createIRI("urn:alice"),
+                        vf.createIRI("urn:listensTo"),
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:testGraph")));
+
+        // Serialize it.
+        try(final Serde<Set<Statement>> serde = new StatementsSerde()) {
+            final byte[] bytes = serde.serializer().serialize("topic", original);
+
+            // Deserialize it.
+            final Set<Statement> deserialized = serde.deserializer().deserialize("topic", bytes);
+
+            // Show the deserialized value matches the original.
+            assertEquals(original, deserialized);
+        }
+    }
+
+    @Test
+    public void deserializeEmptyData() {
+        try(final Serde<Set<Statement>> serde = new StatementsSerde()) {
+            assertNull( serde.deserializer().deserialize("topic", new byte[0]) );
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java
new file mode 100644
index 0000000..e90042d
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.eclipse.rdf4j.common.iteration.CloseableIteration;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailConnection;
+import org.eclipse.rdf4j.sail.SailException;
+import org.eclipse.rdf4j.sail.memory.MemoryStore;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link RyaSinkTask}.
+ */
+public class RyaSinkTaskTest {
+
+    /**
+     * A {@link RyaSinkTask} used to test against an in memory Sail instance.
+     */
+    private static final class InMemoryRyaSinkTask extends RyaSinkTask {
+
+        private Sail sail = null;
+
+        @Override
+        protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+            // Do nothing. Always assume the Rya Instance exists.
+        }
+
+        @Override
+        protected Sail makeSail(final Map<String, String> taskConfig) {
+            if(sail == null) {
+                sail = new MemoryStore();
+                sail.initialize();
+            }
+            return sail;
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void start_ryaInstanceDoesNotExist() {
+        // Create the task that will be tested.
+        final RyaSinkTask task = new RyaSinkTask() {
+            @Override
+            protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+                throw new IllegalStateException("It doesn't exist.");
+            }
+
+            @Override
+            protected Sail makeSail(final Map<String, String> taskConfig) { return null; }
+        };
+
+        // Since the rya instance does not exist, this will throw an exception.
+        task.start(new HashMap<>());
+    }
+
+    @Test
+    public void singleRecord() {
+        // Create the Statements that will be put by the task.
+        final ValueFactory vf = SimpleValueFactory.getInstance();
+        final Set<Statement> statements = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:WorksAt"),
+                        vf.createIRI("urn:Taco Shop"),
+                        vf.createIRI("urn:graph1")),
+                vf.createStatement(
+                        vf.createIRI("urn:Bob"),
+                        vf.createIRI("urn:TalksTo"),
+                        vf.createIRI("urn:Charlie"),
+                        vf.createIRI("urn:graph2")),
+                vf.createStatement(
+                        vf.createIRI("urn:Eve"),
+                        vf.createIRI("urn:ListensTo"),
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:graph1")));
+
+        // Create the task that will be tested.
+        final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+        // Setup the properties that will be used to configure the task. We don't actually need to set anything
+        // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+        final Map<String, String> props = new HashMap<>();
+
+        try {
+            // Start the task.
+            task.start(props);
+
+            // Put the statements as a SinkRecord.
+            task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, statements, 0)) );
+
+            // Flush the statements.
+            task.flush(new HashMap<>());
+
+            // Fetch the stored Statements to show they match the original set.
+            final Set<Statement> fetched = new HashSet<>();
+
+            final Sail sail = task.makeSail(props);
+            try(SailConnection conn = sail.getConnection();
+                    CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+                while(it.hasNext()) {
+                    fetched.add( it.next() );
+                }
+            }
+
+            assertEquals(statements, fetched);
+
+        } finally {
+            // Stop the task.
+            task.stop();
+        }
+    }
+
+    @Test
+    public void multipleRecords() {
+        // Create the Statements that will be put by the task.
+        final ValueFactory vf = SimpleValueFactory.getInstance();
+        final Set<Statement> batch1 = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:WorksAt"),
+                        vf.createIRI("urn:Taco Shop"),
+                        vf.createIRI("urn:graph1")),
+                vf.createStatement(
+                        vf.createIRI("urn:Bob"),
+                        vf.createIRI("urn:TalksTo"),
+                        vf.createIRI("urn:Charlie"),
+                        vf.createIRI("urn:graph2")));
+
+        final Set<Statement> batch2 = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:Eve"),
+                        vf.createIRI("urn:ListensTo"),
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:graph1")));
+
+        // Create the task that will be tested.
+        final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+        // Setup the properties that will be used to configure the task. We don't actually need to set anything
+        // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+        final Map<String, String> props = new HashMap<>();
+
+        try {
+            // Start the task.
+            task.start(props);
+
+            // Put the statements as SinkRecords.
+            final Collection<SinkRecord> records = Sets.newHashSet(
+                    new SinkRecord("topic", 1, null, "key", null, batch1, 0),
+                    new SinkRecord("topic", 1, null, "key", null, batch2, 1));
+            task.put( records );
+
+            // Flush the statements.
+            task.flush(new HashMap<>());
+
+            // Fetch the stored Statements to show they match the original set.
+            final Set<Statement> fetched = new HashSet<>();
+
+            final Sail sail = task.makeSail(props);
+            try(SailConnection conn = sail.getConnection();
+                    CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+                while(it.hasNext()) {
+                    fetched.add( it.next() );
+                }
+            }
+
+            assertEquals(Sets.union(batch1, batch2), fetched);
+
+        } finally {
+            // Stop the task.
+            task.stop();
+        }
+    }
+
+    @Test
+    public void flushBetweenPuts() {
+        // Create the Statements that will be put by the task.
+        final ValueFactory vf = SimpleValueFactory.getInstance();
+        final Set<Statement> batch1 = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:WorksAt"),
+                        vf.createIRI("urn:Taco Shop"),
+                        vf.createIRI("urn:graph1")),
+                vf.createStatement(
+                        vf.createIRI("urn:Bob"),
+                        vf.createIRI("urn:TalksTo"),
+                        vf.createIRI("urn:Charlie"),
+                        vf.createIRI("urn:graph2")));
+
+        final Set<Statement> batch2 = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:Eve"),
+                        vf.createIRI("urn:ListensTo"),
+                        vf.createIRI("urn:Alice"),
+                        vf.createIRI("urn:graph1")));
+
+        // Create the task that will be tested.
+        final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+        // Setup the properties that will be used to configure the task. We don't actually need to set anything
+        // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+        final Map<String, String> props = new HashMap<>();
+
+        try {
+            // Start the task.
+            task.start(props);
+
+            // Put the statements with flushes between them.
+            task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch1, 0)) );
+            task.flush(new HashMap<>());
+            task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch2, 1)) );
+            task.flush(new HashMap<>());
+
+            // Fetch the stored Statements to show they match the original set.
+            final Set<Statement> fetched = new HashSet<>();
+
+            final Sail sail = task.makeSail(props);
+            try(SailConnection conn = sail.getConnection();
+                    CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+                while(it.hasNext()) {
+                    fetched.add( it.next() );
+                }
+            }
+
+            assertEquals(Sets.union(batch1, batch2), fetched);
+
+        } finally {
+            // Stop the task.
+            task.stop();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/api/src/test/resources/simplelogger.properties b/extras/kafka.connect/api/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..1b21312
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/resources/simplelogger.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.slf4j.simpleLogger.defaultLogLevel=debug
diff --git a/extras/kafka.connect/client/README.md b/extras/kafka.connect/client/README.md
new file mode 100644
index 0000000..c7b8963
--- /dev/null
+++ b/extras/kafka.connect/client/README.md
@@ -0,0 +1,21 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project creates a shaded executable jar that may be used to load and
+read statements from a Kafka Topic in the format that the Rya Kafka Connect 
+Sinks expect. This tool is only meant to be used for testing/debugging Kafka
+Connect integration. 
\ No newline at end of file
diff --git a/extras/kafka.connect/client/pom.xml b/extras/kafka.connect/client/pom.xml
new file mode 100644
index 0000000..1ffc8d6
--- /dev/null
+++ b/extras/kafka.connect/client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.kafka.connect.client</artifactId>
+
+    <name>Apache Rya Kafka Connect - Client</name>
+    <description>Contains a client that may be used to load Statements into 
+                 a Kafka topic to be read by Kafka Connect.</description>
+
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.sail</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.api</artifactId>
+        </dependency>
+    
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.beust</groupId>
+            <artifactId>jcommander</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Create an executable jar for the client application. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                          <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.rya.kafka.connect.client.CLIDriver</mainClass>
+                                </transformer>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
new file mode 100644
index 0000000..7ebf083
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+    /**
+     * Maps from command strings to the object that performs the command.
+     */
+    private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
+    static {
+        final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
+        commandClasses.add(ReadStatementsCommand.class);
+        commandClasses.add(WriteStatementsCommand.class);
+        final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
+        for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
+            try {
+                final RyaKafkaClientCommand command = commandClass.newInstance();
+                builder.put(command.getCommand(), command);
+            } catch (InstantiationException | IllegalAccessException e) {
+                System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
+                e.printStackTrace();
+            }
+        }
+        COMMANDS = builder.build();
+    }
+
+    private static final String USAGE = makeUsage(COMMANDS);
+
+    public static void main(final String[] args) {
+        // If no command provided or the command isn't recognized, then print the usage.
+        if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+            System.out.println(USAGE);
+            System.exit(1);
+        }
+
+        // Fetch the command that will be executed.
+        final String command = args[0];
+        final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+        final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+        // Print usage if the arguments are invalid for the command.
+        if(!clientCommand.validArguments(commandArgs)) {
+            System.out.println(clientCommand.getUsage());
+            System.exit(1);
+        }
+
+        // Execute the command.
+        try {
+            clientCommand.execute(commandArgs);
+        } catch (ArgumentsException | ExecutionException e) {
+            System.err.println("The command: " + command + " failed to execute properly.");
+            e.printStackTrace();
+            System.exit(2);
+        }
+    }
+
+    private static String makeUsage(final ImmutableMap<String, RyaKafkaClientCommand> commands) {
+        final StringBuilder usage = new StringBuilder();
+        usage.append("Usage: ").append(CLIDriver.class.getSimpleName()).append(" <command> (<argument> ... )\n");
+        usage.append("\n");
+        usage.append("Possible Commands:\n");
+
+        // Sort and find the max width of the commands.
+        final List<String> sortedCommandNames = Lists.newArrayList(commands.keySet());
+        Collections.sort(sortedCommandNames);
+
+        int maxCommandLength = 0;
+        for (final String commandName : sortedCommandNames) {
+            maxCommandLength = commandName.length() > maxCommandLength ? commandName.length() : maxCommandLength;
+        }
+
+        // Add each command to the usage.
+        final String commandFormat = "    %-" + maxCommandLength + "s - %s\n";
+        for (final String commandName : sortedCommandNames) {
+            final String commandDescription = commands.get(commandName).getDescription();
+            usage.append(String.format(commandFormat, commandName, commandDescription));
+        }
+
+        return usage.toString();
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java
new file mode 100644
index 0000000..8a69a07
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that may be executed by the Rya Kafka Connect Client {@link CLIDriver}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface RyaKafkaClientCommand {
+
+    /**
+     * Command line parameters that are used by all commands that interact with Kafka.
+     */
+    class KafkaParameters {
+
+        @Parameter(names = { "--bootstrapServers", "-b" }, description =
+                "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.")
+        public String bootstrapServers = "localhost:9092";
+
+        @Parameter(names = { "--topic", "-t" }, required = true, description = "The Kafka topic that will be interacted with.")
+        public String topic;
+    }
+
+    /**
+     * @return What a user would type into the command line to indicate
+     *   they want to execute this command.
+     */
+    public String getCommand();
+
+    /**
+     * @return Briefly describes what the command does.
+     */
+    public String getDescription();
+
+    /**
+     * @return Describes what arguments may be provided to the command.
+     */
+    default public String getUsage() {
+        final JCommander parser = new JCommander(new KafkaParameters());
+
+        final StringBuilder usage = new StringBuilder();
+        parser.usage(usage);
+        return usage.toString();
+    }
+
+    /**
+     * Validates a set of arguments that may be passed into the command.
+     *
+     * @param args - The arguments that will be validated. (not null)
+     * @return {@code true} if the arguments are valid, otherwise {@code false}.
+     */
+    public boolean validArguments(String[] args);
+
+    /**
+     * Execute the command using the command line arguments.
+     *
+     * @param args - Command line arguments that configure how the command will execute. (not null)
+     * @throws ArgumentsException there was a problem with the provided arguments.
+     * @throws ExecutionException There was a problem while executing the command.
+     */
+    public void execute(final String[] args) throws ArgumentsException, ExecutionException;
+
+    /**
+     * A {@link RyaKafkaClientCommand} could not be executed because of a problem with
+     * the arguments that were provided to it.
+     */
+    public static final class ArgumentsException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public ArgumentsException(final String message) {
+            super(message);
+        }
+
+        public ArgumentsException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    /**
+     * A {@link RyaKafkaClientCommand} could not be executed.
+     */
+    public static final class ExecutionException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public ExecutionException(final String message) {
+            super(message);
+        }
+
+        public ExecutionException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java
new file mode 100644
index 0000000..bf7a647
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.kafka.connect.api.StatementsDeserializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Reads {@link Statement}s from a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ReadStatementsCommand implements RyaKafkaClientCommand {
+
+    @Override
+    public String getCommand() {
+        return "read";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Reads Statements from the specified Kafka topic.";
+    }
+
+    @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new KafkaParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
+    public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+        requireNonNull(args);
+
+        // Parse the command line arguments.
+        final KafkaParameters params = new KafkaParameters();
+        try {
+            new JCommander(params, args);
+        } catch(final ParameterException e) {
+            throw new ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e);
+        }
+
+        // Set up the consumer.
+        try(KafkaConsumer<String, Set<Statement>> consumer = makeConsumer(params)) {
+            // Subscribe to the configured topic.
+            consumer.subscribe(Collections.singleton(params.topic));
+
+            // Read the statements and write them to output.
+            for(final ConsumerRecord<String, Set<Statement>> record : consumer.poll(500)) {
+                for(final Statement stmt: record.value()) {
+                    System.out.println( stmt );
+                }
+            }
+        }
+    }
+
+    private KafkaConsumer<String, Set<Statement>> makeConsumer(final KafkaParameters params) {
+        requireNonNull(params);
+
+        // Configure which instance of Kafka to connect to.
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers);
+
+        // Nothing meaningful is in the key and the values is a Set<BindingSet> object.
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StatementsDeserializer.class);
+
+        // Use a UUID for the Group Id so that we never register as part of the same group as another consumer.
+        final String groupId = UUID.randomUUID().toString();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        // Set a client id so that server side logging can be traced.
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Kafka-Connect-Client-" + groupId);
+
+        // These consumers always start at the beginning and move forwards until the caller is finished with
+        // the returned stream, so never commit the consumer's progress.
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        return new KafkaConsumer<>(props);
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
new file mode 100644
index 0000000..83311f5
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+    private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+    /**
+     * Command line parameters that are used by this command to configure itself.
+     */
+    public static class WriteParameters extends KafkaParameters {
+        @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
+        public String statementsFile;
+    }
+
+    @Override
+    public String getCommand() {
+        return "write";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Writes Statements to the specified Kafka topic.";
+    }
+
+    @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new WriteParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    /**
+     * @return Describes what arguments may be provided to the command.
+     */
+    @Override
+    public String getUsage() {
+        final JCommander parser = new JCommander(new WriteParameters());
+
+        final StringBuilder usage = new StringBuilder();
+        parser.usage(usage);
+        return usage.toString();
+    }
+
+    @Override
+    public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+        requireNonNull(args);
+
+        // Parse the command line arguments.
+        final WriteParameters params = new WriteParameters();
+        try {
+            new JCommander(params, args);
+        } catch(final ParameterException e) {
+            throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
+        }
+
+        // Verify the configured statements file path.
+        final Path statementsPath = Paths.get(params.statementsFile);
+        if(!statementsPath.toFile().exists()) {
+            throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
+                    "file does not exist. Make sure you've entered the correct path.");
+        }
+
+        // Create an RDF Parser whose format is derived from the statementPath's file extension.
+        final String filename = statementsPath.getFileName().toString();
+        final RDFFormat format = RdfFormatUtils.forFileName(filename);
+        if (format == null) {
+            throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
+        }
+        final RDFParser parser = Rio.createParser(format);
+
+        // Set up the producer.
+        try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
+            // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
+            parser.setRDFHandler(new AbstractRDFHandler() {
+
+                private Set<Statement> batch = new HashSet<>(5);
+
+                @Override
+                public void startRDF() throws RDFHandlerException {
+                    log.trace("Starting loading statements.");
+                }
+
+                @Override
+                public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+                    log.trace("Adding statement.");
+                    batch.add(stmnt);
+
+                    if(batch.size() == 5) {
+                        flushBatch();
+                    }
+                }
+
+                @Override
+                public void endRDF() throws RDFHandlerException {
+                    if(!batch.isEmpty()) {
+                        flushBatch();
+                    }
+                    log.trace("Done.");
+                }
+
+                private void flushBatch() {
+                    log.trace("Flushing batch of size " + batch.size());
+                    producer.send(new ProducerRecord<>(params.topic, null, batch));
+                    batch = new HashSet<>(5);
+                    producer.flush();
+                }
+            });
+
+            // Do the parse and load.
+            try {
+                parser.parse(Files.newInputStream(statementsPath), "");
+            } catch (RDFParseException | RDFHandlerException | IOException e) {
+                throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
+            }
+        }
+    }
+
+    private static Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
+        requireNonNull(params);
+        final Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers);
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StatementsSerializer.class.getName());
+        return new KafkaProducer<>(props);
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/client/src/main/resources/log4j.properties b/extras/kafka.connect/client/src/main/resources/log4j.properties
new file mode 100644
index 0000000..b07468c
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/extras/kafka.connect/mongo-it/README.md b/extras/kafka.connect/mongo-it/README.md
new file mode 100644
index 0000000..b154b95
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/README.md
@@ -0,0 +1,19 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains integration tests that verify a Mongo DB backed 
+implementation of the Rya Kafka Connect Sink is working properly.
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo-it/pom.xml b/extras/kafka.connect/mongo-it/pom.xml
new file mode 100644
index 0000000..ca439ea
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.kafka.connect.mongo.it</artifactId>
+
+    <name>Apache Rya Kafka Connect - Mongo DB Integration Tests</name>
+    <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description>
+
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.mongo</artifactId>
+        </dependency>
+
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.mongo</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
new file mode 100644
index 0000000..55e7603
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+    @Test
+    public void instanceExists() throws Exception {
+        // Install an instance of Rya.
+        final String ryaInstanceName = "rya";
+        final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+                super.getMongoHostname(),
+                super.getMongoPort(),
+                Optional.empty(),
+                Optional.empty());
+
+        final InstallConfiguration installConfig = InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(false)
+                .setEnableGeoIndex(false)
+                .build();
+
+        final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+        ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+        // Create the task that will be tested.
+        final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded Mongo DB instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+            // This will pass because the Rya instance exists.
+            task.start(config);
+        } finally {
+            task.stop();
+        }
+    }
+
+    @Test(expected = ConnectException.class)
+    public void instanceDoesNotExist() throws Exception {
+        // Create the task that will be tested.
+        final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded Mongo DB instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
+
+            // Starting the task will fail because the Rya instance does not exist.
+            task.start(config);
+        } finally {
+            task.stop();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/README.md b/extras/kafka.connect/mongo/README.md
new file mode 100644
index 0000000..03b2c9b
--- /dev/null
+++ b/extras/kafka.connect/mongo/README.md
@@ -0,0 +1,23 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project is the Rya Kafka Connect Sink that writes to Mongo DB backed 
+instances of Rya.
+
+This project produces a shaded jar that may be installed into Kafka Connect. 
+For more information about how to install and configure this connector, see
+[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md).
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/pom.xml b/extras/kafka.connect/mongo/pom.xml
new file mode 100644
index 0000000..66eba1b
--- /dev/null
+++ b/extras/kafka.connect/mongo/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rya.kafka.connect.mongo</artifactId>
+
+    <name>Apache Rya Kafka Connect - Mongo DB</name>
+    <description>A Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description>
+    
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Build the uber jar that may be deployed to Kafka Connect. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
new file mode 100644
index 0000000..3b48556
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+    public static final String HOSTNAME = "mongo.hostname";
+    private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections will use.";
+
+    public static final String PORT = "mongo.port";
+    private static final String PORT_DOC = "The Mongo DB port the Sail connections will use.";
+
+    public static final String USERNAME = "mongo.username";
+    private static final String USERNAME_DOC = "The Mongo DB username the Sail connections will use.";
+
+    public static final String PASSWORD = "mongo.password";
+    private static final String PASSWORD_DOC = "The Mongo DB password the Sail connections will use.";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(HOSTNAME, Type.STRING, Importance.HIGH, HOSTNAME_DOC)
+            .define(PORT, Type.INT, Importance.HIGH, PORT_DOC)
+            .define(USERNAME, Type.STRING, "", Importance.HIGH, USERNAME_DOC)
+            .define(PASSWORD, Type.PASSWORD, "", Importance.HIGH, PASSWORD_DOC);
+    static {
+        RyaSinkConfig.addCommonDefinitions(CONFIG_DEF);
+    }
+
+    /**
+     * Constructs an instance of {@link MongoRyaSinkConfig}.
+     *
+     * @param originals - The key/value pairs that define the configuration. (not null)
+     */
+    public MongoRyaSinkConfig(final Map<?, ?> originals) {
+        super(CONFIG_DEF, originals);
+    }
+
+    /**
+     * @return The Mongo DB hostname the Sail connections wlll use.
+     */
+    public String getHostname() {
+        return super.getString(HOSTNAME);
+    }
+
+    /**
+     * @return The Mongo DB port the Sail connections will use.
+     */
+    public int getPort() {
+        return super.getInt(PORT);
+    }
+
+    /**
+     * @return The Mongo DB username the Sail connections will use.
+     */
+    public String getUsername() {
+        return super.getString(USERNAME);
+    }
+
+    /**
+     * @return The Mongo DB password the Sail connections will use.
+     */
+    public String getPassword() {
+        return super.getPassword(PASSWORD).value();
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
new file mode 100644
index 0000000..fd91d07
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+    @Nullable
+    private MongoRyaSinkConfig config = null;
+
+    @Override
+    public void start(final Map<String, String> props) {
+        this.config = new MongoRyaSinkConfig( props );
+    }
+
+    @Override
+    protected AbstractConfig getConfig() {
+        if(config == null) {
+            throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first.");
+        }
+        return config;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MongoRyaSinkTask.class;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return MongoRyaSinkConfig.CONFIG_DEF;
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
new file mode 100644
index 0000000..6887fdb
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.log.LogUtils;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+    @Override
+    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+        @Nullable
+        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+        @Nullable
+        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+        // Connect a Mongo Client to the configured Mongo DB instance.
+        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+        final boolean hasCredentials = username != null && password != null;
+
+        try(MongoClient mongoClient = hasCredentials ?
+                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+                new MongoClient(serverAddr)) {
+            // Use a RyaClient to see if the configured instance exists.
+            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+                    config.getHostname(),
+                    config.getPort(),
+                    Optional.ofNullable(username),
+                    Optional.ofNullable(password));
+
+            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+                throw new ConnectException("The Rya Instance named " +
+                        LogUtils.clean(config.getRyaInstanceName()) + " has not been installed.");
+            }
+        } catch(final RyaClientException e) {
+            throw new ConnectException("Unable to determine if the Rya Instance named " +
+                    LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e);
+        }
+    }
+
+    @Override
+    protected Sail makeSail(final Map<String, String> taskConfig) {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+        // Move the configuration into a Rya Configuration object.
+        final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+        ConfigUtils.setUseMongo(ryaConfig, true);
+        ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+        ryaConfig.setMongoHostname( config.getHostname() );
+        ryaConfig.setMongoPort( "" + config.getPort() );
+
+        if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+            ryaConfig.setMongoUser( config.getUsername() );
+            ryaConfig.setMongoPassword( config.getPassword() );
+        }
+
+        // Create the Sail object.
+        try {
+            return RyaSailFactory.getInstance(ryaConfig);
+        } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+            throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java
new file mode 100644
index 0000000..d6c7c96
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link MongoRyaSinkConfig}.
+ */
+public class MongoRyaSinkConfigTest {
+
+    @Test
+    public void parses() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(MongoRyaSinkConfig.HOSTNAME, "127.0.0.1");
+        properties.put(MongoRyaSinkConfig.PORT, "27017");
+        properties.put(MongoRyaSinkConfig.USERNAME, "alice");
+        properties.put(MongoRyaSinkConfig.PASSWORD, "alice1234!@");
+        properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+        new MongoRyaSinkConfig(properties);
+    }
+}
\ No newline at end of file
diff --git a/extras/kafka.connect/pom.xml b/extras/kafka.connect/pom.xml
new file mode 100644
index 0000000..9a9702c
--- /dev/null
+++ b/extras/kafka.connect/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.kafka.connect.parent</artifactId>
+
+    <name>Apache Rya Kafka Connect Parent</name>
+    <description>The parent pom file for any Rya Kafka Connect project.</description>
+
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>api</module>
+        <module>accumulo</module>
+        <module>accumulo-it</module>
+        <module>mongo</module>
+        <module>mongo-it</module>
+        <module>client</module>
+    </modules>
+
+    <properties>
+        <kafka.version>1.1.0</kafka.version>
+    </properties>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Build-Version>${project.version}</Build-Version>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/extras/pom.xml b/extras/pom.xml
index bb6f914..65dd4cc 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -45,6 +45,7 @@
         <module>rya.merger</module>
         <module>rya.streams</module>
         <module>rya.forwardchain</module>
+        <module>kafka.connect</module>
     </modules>
 
     <profiles>
diff --git a/extras/rya.manual/src/site/markdown/_index.md b/extras/rya.manual/src/site/markdown/_index.md
index d0b61c4..07dfe50 100644
--- a/extras/rya.manual/src/site/markdown/_index.md
+++ b/extras/rya.manual/src/site/markdown/_index.md
@@ -33,6 +33,7 @@
 - [Inferencing](infer.md)
 - [MapReduce Interface](mapreduce.md)
 - [Rya Streams](rya-streams.md)
+- [Kafka Connect Integration](kafka-connect-integration.md)
 
 # Samples
 - [Typical First Steps](sm-firststeps.md)
diff --git a/extras/rya.manual/src/site/markdown/index.md b/extras/rya.manual/src/site/markdown/index.md
index 5372618..e686736 100644
--- a/extras/rya.manual/src/site/markdown/index.md
+++ b/extras/rya.manual/src/site/markdown/index.md
@@ -35,6 +35,7 @@
 - [Shell Interface](shell.md)
 - [Incremental Join Maintenance Application (PCJ Updater)](pcj-updater.md)
 - [Rya Streams](rya-streams.md)
+- [Kafka Connect Integration](kafka-connect-integration.md)
 
 # Samples
 - [Typical First Steps](sm-firststeps.md)
diff --git a/extras/rya.manual/src/site/markdown/kafka-connect-integration.md b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md
new file mode 100644
index 0000000..ed31c33
--- /dev/null
+++ b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md
@@ -0,0 +1,493 @@
+<!--
+[comment]: # Licensed to the Apache Software Foundation (ASF) under one
+[comment]: # or more contributor license agreements.  See the NOTICE file
+[comment]: # distributed with this work for additional information
+[comment]: # regarding copyright ownership.  The ASF licenses this file
+[comment]: # to you under the Apache License, Version 2.0 (the
+[comment]: # "License"); you may not use this file except in compliance
+[comment]: # with the License.  You may obtain a copy of the License at
+[comment]: # 
+[comment]: #   http://www.apache.org/licenses/LICENSE-2.0
+[comment]: # 
+[comment]: # Unless required by applicable law or agreed to in writing,
+[comment]: # software distributed under the License is distributed on an
+[comment]: # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+[comment]: # KIND, either express or implied.  See the License for the
+[comment]: # specific language governing permissions and limitations
+[comment]: # under the License.
+-->
+
+# Kafka Connect Integration #
+
+Introduced in 4.0.0
+
+# Table of Contents #
+- [Introduction](#introduction)
+- [An Important Note About Deploying the Plugins](#an-important-note-about-deploying-the-plugins)
+- [Statement Serialization Format](#statement-serialization-format)
+- [Quick Start](#quick-start)
+- [Future Work](#future-work)
+
+<div id='introduction'/>
+
+## Introduction ##
+
+[Kafka Connect](https://kafka.apache.org/documentation/#connect) is a system 
+that is able to pull data from data sources into Kafka topics as well as write 
+data from Kafka topics into data sinks. This project implements a Kafka 
+Connector Sink for both Accumulo backed and Mongo backed instances of Rya. 
+
+<div id='an-important-note-about-deploying-the-plugins'/>
+
+## An Important Note About Deploying the Plugins ##
+
+While testing the application with both the Mongo Rya Sink and Accumulo Rya Sink
+uber jars installed, we were seeing ClassCastExceptions being thrown when some 
+code was trying to cast a ContextStatement into a Statement. Normally, this 
+wouldn't cause a problem. However, within Connect, this was caused by both uber
+jars containing a copy of the ContextStatement and Statement classes. Different
+Classloaders loaded each of those classes and the relationship between them
+was lost.
+
+For now, it's important that you only deploy one of the uber jars at a time.
+
+<div id='statement-serialization-format'/>
+
+## Statement Serialization Format ##
+
+Applications that would like to write to a Kafka topic using the format that
+the sink is able to recognize must write ```Set<Statement>``` objects by using the 
+[StatementsSerializer](../../../../../extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java).
+
+Rather than using the Confluent Schema Registry and Avro to serialize Statements, 
+we're going with RDF4J's Rio Binary Format. You may read more about how that 
+format is implemented [here](http://docs.rdf4j.org/rdf4j-binary/).
+          
+<div id='quick-start'/>
+
+## Quick Start ##
+
+This tutorial demonstrates how to install and start the Accumulo Rya Sink and 
+the Mongo Rya Sink by using the Open Source version of the Confluent platform.
+You can download it [here](https://www.confluent.io/download/). We're going to
+use the standalone version of Connect, but in a production environment you may
+want to use the distributed mode if there is a lot of data that needs to be 
+inserted.
+
+We suggest you go through the 
+[Confluent Platform Open Source Quick Start](https://docs.confluent.io/current/quickstart/cos-quickstart.html),
+so that you can ensure the Confluent platform is installed and ready for use. 
+We're using Confluent 4.1.0 in this tutorial, so be aware some things may change 
+when using newer versions of the platform. You may also find it beneficial to 
+go through the [Kafka Connect Quick Start](https://docs.confluent.io/current/connect/quickstart.html) 
+as well to see how Kafka Connects works in general.
+
+### Step 1: Download the applications ###
+
+You can fetch the artifacts you need to follow this Quick Start from our
+[downloads page](http://rya.apache.org/download/). Click on the release of
+interest and follow the "Central repository for Maven and other dependency
+managers" URL.
+
+Fetch the following four artifacts:
+
+Artifact Id | Type 
+--- | ---
+rya.shell | shaded jar
+rya.kafka.connect.client | shaded jar
+rya.kafka.connect.accumulo | shaded jar
+rya.kafka.connect.mongo | shaded jar
+
+### Step 2: Load statements into a kafka topic ###
+
+The sink connector that we will be demonstrating reads Statements from
+a Kafka topic and loads them into an instance of Rya. Just to keep things simple,
+lets create a topic that only has a single partition and is unreplicated. Within
+a production environment, you will want to tune these values based on how many
+concurrent workers you would like to use when processing input. 
+
+```
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic statements
+```
+
+Next we need to create a file that contains the statements we will load into the topic.
+Name the file "quickstart-statements.nt" and use a text editor to write the following lines to it:
+
+```
+<urn:Alice> <urn:talksTo> <urn:Bob> .
+<urn:Bob> <urn:talksTo> <urn:Alice> .
+<urn:Bob> <urn:talksTo> <urn:Charlie> .
+<urn:Charlie> <urn:talksTo> <urn:Alice> .
+<urn:David> <urn:talksTo> <urn:Eve> .
+<urn:Eve> <urn:listensTo> <urn:Bob> .
+```
+
+Use the ```rya.kafka.connect.client``` to write the file's contents to the topic we just made.
+
+```
+java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar write -f quickstart-statements.nt -t statements
+```
+ 
+You may verify the statements were written by using the read command.
+
+```
+java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar read -t statements
+```
+At this point you need to decide whether you are going to use an Accumulo or 
+MongoDB backed instance of Rya. The following steps are pretty much the same
+for both backends, but they require different jars and commands. To follow
+the Accumulo set of steps, start with __Accumulo Step 3__ and go through 
+__Accumulo Step 5__. To follow the Mongo set of steps, then just skip ahead 
+to __Mongo Step 3__ and go through __Mongo Step 5__.
+
+### Accumulo Step 3: Installing a Rya instance ###
+
+The sink needs a place to put the Statements that we just wrote to the kafka topic.
+We're going to have it write to a Rya instance named "quickstart" on your Accumulo
+cluster. To do this, you'll need to use the Rya Shell. Here's roughly what an
+installation session should look like.
+
+```
+java -jar rya.shell-4.0.0-incubating-shaded.jar
+
+ _____                _____ _          _ _
+|  __ \              / ____| |        | | |
+| |__) |   _  __ _  | (___ | |__   ___| | |
+|  _  / | | |/ _` |  \___ \| '_ \ / _ \ | |
+| | \ \ |_| | (_| |  ____) | | | |  __/ | |
+|_|  \_\__, |\__,_| |_____/|_| |_|\___|_|_|
+        __/ |
+       |___/
+4.0.0-incubating
+
+Welcome to the Rya Shell.
+
+Execute one of the connect commands to start interacting with an instance of Rya.
+You may press tab at any time to see which of the commands are available.
+
+rya>connect-accumulo --zookeepers localhost --instanceName quickstart_instance --username quickstart
+Password: *******
+
+rya/quickstart_instance> install
+Rya Instance Name [default: rya_]: quickstart
+Use Shard Balancing (improves streamed input write speeds) [default: false]: f
+Use Entity Centric Indexing [default: true]: f
+Use Free Text Indexing [default: true]: f
+Use Temporal Indexing [default: true]: f
+Use Precomputed Join Indexing [default: true]: f
+
+A Rya instance will be installed using the following values:
+   Instance Name: quickstart
+   Use Shard Balancing: false
+   Use Entity Centric Indexing: false
+   Use Free Text Indexing: false
+   Use Temporal Indexing: false
+   Use Precomputed Join Indexing: false
+
+Continue with the install? (y/n) y
+The Rya instance named 'quickstart' has been installed.
+
+```
+We also want to ensure the instance we just installed does not have any Statements
+yet. We can do this using that same shell instance.
+
+```
+# 2. Verify no data has been inserted yet.
+rya/quickstart_instance> connect-rya --instance quickstart
+rya/quickstart_instance:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o .}\e
+Executing Query...
+Query Results:
+
+rya/quickstart_instance:quickstart> 
+```
+
+### Accumulo Step 4: Installing and running the Accumulo Rya Sink ###
+
+At this point we have a kafka topic that is filled with RDF Statements that
+need to be loaded into Rya. We also have a Rya instance for them to be written
+to. All that is left is to install the Accumulo Rya Sink, configure it to 
+use those two endpoints, and then load it.
+
+The version of the Confluent platform we used for this quick start doesn't seem
+to be able to find new connector installs dynamically, so start by shutting 
+everything down.
+
+```
+confluent stop
+```
+
+Install the shaded jar that contains the Accumulo Rya Sink connector.
+
+```
+mkdir confluent-4.1.0/share/java/kafka-connect-rya-accumulo
+cp rya.kafka.connect.accumulo-4.0.0-incubating-shaded.jar confluent-4.1.0/share/java/kafka-connect-rya-accumulo
+```
+
+Then we need to configure the connector to read from the "statements" topic,
+specify which Accumulo cluster is hosting the Rya Instance, which Rya Instance 
+to write to, and specify the classes that define the Connector and the Converter 
+to use. This file has clear text passwords in it, so ensure it has appropriate 
+access restrictions applied to it. 
+
+```
+touch confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties
+```
+
+And then use your favorite text editor to fill in the following values:
+
+```
+name=rya-accumulo-sink
+connector.class=org.apache.rya.kafka.connect.accumulo.AccumuloRyaSinkConnector
+tasks.max=1
+value.converter=org.apache.rya.kafka.connect.api.StatementsConverter
+topics=statements
+accumulo.zookeepers=127.0.0.1
+accumulo.cluster.name=<your cluster name here>
+accumulo.username=<your username here>
+accumulo.password=<your password here>
+rya.instance.name=quickstart
+```
+
+Start the Confluent platform:
+
+```
+confluent start
+```
+
+Even after the start command says everything is started, it may take a moment for
+the load command to work. Rerun this command until you get a response from the 
+REST service printed to the screen and confluent reports it as loaded:
+
+```
+confluent load rya-accumulo-sink -d confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties
+```
+
+The connector will automatically start workers that load the data from the 
+configured topic into the configured Rya instance.
+
+### Accumulo Step 5: Verify statements were written to Rya ###
+
+At this point you should be able to rerun the query from __Accumulo Step 3__ and
+see that Statements have been added to the Rya instance.
+
+```
+rya/quickstart_instance:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o . }\e
+Executing Query...
+Query Results:
+p,s,o
+urn:talksTo,urn:Alice,urn:Bob
+urn:talksTo,urn:Bob,urn:Alice
+urn:talksTo,urn:Bob,urn:Charlie
+urn:talksTo,urn:Charlie,urn:Alice
+urn:talksTo,urn:David,urn:Eve
+urn:listensTo,urn:Eve,urn:Bob
+Done.
+```
+
+### Mongo Step 3: Installing a Rya instance ###
+
+The sink needs a place to put the Statements that we just wrote to the kafka topic.
+We're going to have it write to a Rya instance named "quickstart" within your 
+Mongo database. To do this, you'll need to use the Rya Shell. Here's roughly 
+what an installation session should look like.
+
+```
+[root@localhost ~]# java -jar rya.shell-4.0.0-incubating-SNAPSHOT-shaded.jar
+ _____                _____ _          _ _
+|  __ \              / ____| |        | | |
+| |__) |   _  __ _  | (___ | |__   ___| | |
+|  _  / | | |/ _` |  \___ \| '_ \ / _ \ | |
+| | \ \ |_| | (_| |  ____) | | | |  __/ | |
+|_|  \_\__, |\__,_| |_____/|_| |_|\___|_|_|
+        __/ |
+       |___/
+4.0.0-incubating-SNAPSHOT
+
+Welcome to the Rya Shell.
+
+Execute one of the connect commands to start interacting with an instance of Rya.
+You may press tab at any time to see which of the commands are available.
+
+rya> connect-mongo --hostname localhost --port 27017
+Connected. You must select a Rya instance to interact with next.
+
+rya/localhost> install
+Rya Instance Name [default: rya_]: quickstart
+Use Free Text Indexing [default: true]: f
+Use Temporal Indexing [default: true]: f
+Use PCJ Indexing [default: true]: f
+
+A Rya instance will be installed using the following values:
+   Instance Name: quickstart
+   Use Free Text Indexing: false
+   Use Temporal Indexing: false
+   Use PCJ Indexing: false
+
+Continue with the install? (y/n) y
+The Rya instance named 'quickstart' has been installed.
+```
+We also want to ensure the instance we just installed does not have any Statements
+yet. We can do this using that same shell instance.
+
+```
+rya/localhost> connect-rya --instance quickstart
+rya/localhost:quickstart> select * where { ?s ?p ?o .}\e
+Command 'select * where { ?s ?p ?o .}\e' not found (for assistance press TAB)
+rya/localhost:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o .}\e
+Executing Query...
+No Results Found.
+Done.
+rya/localhost:quickstart> 
+```
+
+### Mongo Step 4: Installing and running the Mongo Rya Sink ###
+
+At this point we have a kafka topic that is filled with RDF Statements that
+need to be loaded into Rya. We also have a Rya instance for them to be written
+to. All that is left is to install the Mongo Rya Sink, configure it to 
+use those two endpoints, and then load it.
+
+The version of the Confluent platform we used for this quick start doesn't seem
+to be able to find new connector installs dynamically, so start but shutting 
+everything down.
+
+```
+confluent stop
+```
+
+Install the shaded jar that contains the Mongo Rya Sink connector.
+
+```
+mkdir confluent-4.1.0/share/java/kafka-connect-rya-mongo
+cp rya.kafka.connect.mongo-4.0.0-incubating-shaded.jar confluent-4.1.0/share/java/kafka-connect-rya-mongo
+```
+
+Then we need to configure the connector to read from the "statements" topic,
+specify which Mongo database is hosting the Rya Instance, which Rya Instance 
+to write to, and specify the classes that define the Connector and the Converter 
+to use. This file has clear text passwords in it, so ensure it has appropriate 
+access restrictions applied to it. 
+
+```
+touch confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties
+```
+
+And then use your favorite text editor to fill in the following values:
+
+```
+name=rya-mongo-sink
+connector.class=org.apache.rya.kafka.connect.mongo.MongoRyaSinkConnector
+tasks.max=1
+value.converter=org.apache.rya.kafka.connect.api.StatementsConverter
+topics=statements
+mongo.hostname=127.0.0.1
+mongo.port=27017
+mongo.username=
+mongo.password=
+rya.instance.name=quickstart
+```
+
+Start the Confluent platform:
+
+```
+confluent start
+```
+
+Even after the start command says everything is started, it may take a moment for
+the load command to work. Rerun this command until you get a response from the 
+REST service printed to the screen and confluent reports it as loaded:
+
+```
+confluent load rya-mongo-sink -d confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties
+```
+
+The connector will automatically start workers that load the data from the 
+configured topic into the configured Rya instance.
+
+### Mongo Step 5: Verify statements were written to Rya ###
+
+At this point you should be able to rerun the query from __Mongo Step 3__ and
+see that statements have been added to the Rya instance.
+
+```
+rya/localhost:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o . }\e
+Executing Query...
+Query Results:
+p,s,o
+urn:talksTo,urn:Alice,urn:Bob
+urn:talksTo,urn:Bob,urn:Charlie
+urn:talksTo,urn:Charlie,urn:Alice
+urn:talksTo,urn:Bob,urn:Alice
+urn:talksTo,urn:David,urn:Eve
+urn:listensTo,urn:Eve,urn:Bob
+Done.
+```
+
+<div id='future-work'/>
+
+## Future Work ##
+
+### Remove passwords from connector configuration files ###
+
+It's a security flaw that the connector's passwords for connecting to Accumulo
+and Mongo are in clear text within the configuration files. The log files hide
+them when they log the configuration, but it's still written to standard out
+when the use the confluent command to load the connector. There should be 
+another way for the connector to receive the credentials required to connect.
+
+### Support both Mongo and Accumulo connectors at the same time ###
+
+Currently, you can only use the Mongo Rya Sink or the Accumulo Rya Sink because
+of the problem mentioned in
+[An Important Note About Deploying the Plugins](#an-important-note-about-deploying-the-plugins).
+
+We could have a single uber jar plugin that supports both backends. We could also
+just not use uber jars and include all of the jars that the plugins depend on
+in a single folder.
+
+### Visibility Statement Support ###
+
+The sinks are able to write Statement objects, but that means none of those
+statements are allowed to have visibility expressions. If they do, then the
+visibilities will be dropped when the statement is inserted.
+
+It would be nice if the Rya implementation of the RDF4J RepositoryConnection
+were able to figure out when a Statement is actually a VisibilityStatement. It
+could then retain the visibility expression when it is persisted.
+
+### Visibility Binding Set Sink ###
+
+The Visibility Binding Sets that are stored within the Precomputed Join index
+could be generated by an external system (such as Rya Streams) and written to
+a Kafka topic. It would be convenient to also have a connector that is able to
+write those values to the index.
+
+### Rya Kafka Connect Source ###
+
+Rya Streams would benefit from having a Kafka Connect Source that is able to
+determine when new Statements have been added to the core tables, and then write
+those Visibility Statements to a Kafka topic.
+
+### Geo Indexing ###
+
+It's difficult to get the Geo indexing into the Sail object that represents
+Rya because the geo project is optional. While optional, we don't use dependency
+injection to get the GeoRyaSailFactory into the application instead of the
+normal Rya Sail Factory. An improvement to this project would be to resolve
+that problem so that it may do geo indexing while inserting statements.
\ No newline at end of file
diff --git a/extras/rya.manual/src/site/site.xml b/extras/rya.manual/src/site/site.xml
index 2af3373..a102902 100644
--- a/extras/rya.manual/src/site/site.xml
+++ b/extras/rya.manual/src/site/site.xml
@@ -48,7 +48,8 @@
         <item name="MapReduce Interface" href="mapreduce.html"/>

         <item name="Shell Interface" href="shell.html"/>

         <item name="Incremental Join Maintenance" href="pcj-updater.html"/>

-        <item name="Rya Streams" href="rya-streams.html"/>
+        <item name="Rya Streams" href="rya-streams.html"/>

+        <item name="Kafka Connect Integration" href="kafka-connect-integration.html"/>
     </menu>

 

     <menu name="Samples">

diff --git a/pom.xml b/pom.xml
index 699fccf..b46110f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
         <jcip.version>1.0-1</jcip.version>
         <kafka.version>0.10.0.1</kafka.version>
         <kryo.version>3.0.3</kryo.version>
+        <jcabi-manifeses.version>1.1</jcabi-manifeses.version>
         
         <!-- set profile property defaults -->
         <skip.rya.it>true</skip.rya.it>  <!-- modified by  -P enable-it  -->
@@ -555,6 +556,21 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.accumulo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.mongo</artifactId>
+                <version>${project.version}</version>
+	    </dependency>
+            <dependency>
                 <groupId>org.apache.accumulo</groupId>
                 <artifactId>accumulo-core</artifactId>
                 <version>${accumulo.version}</version>
@@ -564,11 +580,6 @@
                 <artifactId>accumulo-start</artifactId>
                 <version>${accumulo.version}</version>
             </dependency>
-           <dependency>
-                <groupId>org.eclipse.rdf4j</groupId>
-                <artifactId>rdf4j-runtime-osgi</artifactId>
-                <version>${org.eclipse.rdf4j.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.eclipse.rdf4j</groupId>
                 <artifactId>rdf4j-runtime</artifactId>
@@ -604,6 +615,22 @@
                 <artifactId>rdf4j-queryresultio-text</artifactId>
                 <version>${org.eclipse.rdf4j.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-api</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-binary</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-datatypes</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.eclipse.rdf4j</groupId>
                 <artifactId>rdf4j-rio-nquads</artifactId>
@@ -709,6 +736,11 @@
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-simple</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-log4j12</artifactId>
                 <version>${slf4j.version}</version>
             </dependency>
@@ -1075,6 +1107,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-api</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.11</artifactId>
                 <version>${kafka.version}</version>
                 <classifier>test</classifier>
@@ -1084,6 +1121,11 @@
                 <artifactId>kryo</artifactId>
                 <version>${kryo.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.jcabi</groupId>
+                <artifactId>jcabi-manifests</artifactId>
+                <version>${jcabi-manifeses.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>