Added sink task and sink class
Added end to end sink test
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 68460e4..08af2ad 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -1,76 +1,81 @@
 package geode.kafka.sink;
 
+import geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class GeodeKafkaSink {
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
 
-//
-//    /** Sink properties. */
-//    private Map<String, String> configProps;
-//
-//    /** Expected configurations. */
-//    private static final ConfigDef CONFIG_DEF = new ConfigDef();
-//
-//    /** {@inheritDoc} */
-//    @Override public String version() {
-//        return AppInfoParser.getVersion();
-//    }
-//
-//    /**
-//     * A sink lifecycle method. Validates grid-specific sink properties.
-//     *
-//     * @param props Sink properties.
-//     */
-//    @Override public void start(Map<String, String> props) {
-//        configProps = props;
-//
-//        try {
-//            A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
-//            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
-//            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
-//        }
-//        catch (IllegalArgumentException e) {
-//            throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
-//        }
-//    }
-//
-//    /**
-//     * Obtains a sink task class to be instantiated for feeding data into grid.
-//     *
-//     * @return IgniteSinkTask class.
-//     */
-//    @Override public Class<? extends Task> taskClass() {
-//        return IgniteSinkTask.class;
-//    }
-//
-//    /**
-//     * Builds each config for <tt>maxTasks</tt> tasks.
-//     *
-//     * @param maxTasks Max number of tasks.
-//     * @return Task configs.
-//     */
-//    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
-//        List<Map<String, String>> taskConfigs = new ArrayList<>();
-//        Map<String, String> taskProps = new HashMap<>();
-//
-//        taskProps.putAll(configProps);
-//
-//        for (int i = 0; i < maxTasks; i++)
-//            taskConfigs.add(taskProps);
-//
-//        return taskConfigs;
-//    }
-//
-//    /** {@inheritDoc} */
-//    @Override public void stop() {
-//        // No-op.
-//    }
-//
-//    /** {@inheritDoc} */
-//    @Override public ConfigDef config() {
-//        return CONFIG_DEF;
-//    }
+public class GeodeKafkaSink extends SinkConnector  {
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+    private Map<String, String> sharedProps;
+
+    @Override
+    public void start(Map<String, String> props) {
+        sharedProps = computeMissingConfigurations(props);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return GeodeKafkaSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(sharedProps);
+
+        for (int i = 0; i < maxTasks; i++) {
+            taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+            taskConfigs.add(taskProps);
+        }
+
+        return taskConfigs;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        //TODO
+        return "unknown";
+    }
+
+
+    private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+        props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+        props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
+        props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
+        props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
+        props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
+        props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+        return props;
+    }
 }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index d5da62a..36b9942 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -28,6 +28,7 @@
 public class GeodeKafkaSource extends SourceConnector {
 
   private Map<String, String> sharedProps;
+  //TODO maybe club this into GeodeConnnectorConfig
   private static final ConfigDef CONFIG_DEF = new ConfigDef();
 
 
@@ -40,7 +41,6 @@
   public List<Map<String, String>> taskConfigs(int maxTasks) {
     List<Map<String, String>> taskConfigs = new ArrayList<>();
     Map<String, String> taskProps = new HashMap<>();
-
     taskProps.putAll(sharedProps);
 
     for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 3f2ac80..829eb29 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -69,7 +69,7 @@
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
         }
         catch (Exception e) {
-            logger.error("Unable to start task", e);
+            logger.error("Unable to start source task", e);
             throw e;
         }
     }
@@ -81,7 +81,7 @@
         if (eventBuffer.drainTo(events, batchSize) > 0) {
             for (GeodeEvent event : events) {
                 for (String topic : topics) {
-                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
                 }
             }
             return records;
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index f1a6dff..f0d7f06 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -13,12 +13,19 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -26,12 +33,15 @@
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
 
 public class GeodeKafkaTestCluster {
 
@@ -54,10 +64,17 @@
     startKafka();
     startGeode();
     createTopic();
-
     startWorker();
     consumer = createConsumer();
-    Thread.sleep(5000);
+  }
+
+  @Before
+  public void beforeTests() {
+  }
+
+  @After
+  public void afterTests() {
+
   }
 
   @AfterClass
@@ -86,7 +103,7 @@
     Properties topicProperties = new Properties();
     topicProperties.put("flush.messages", "1");
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic(TEST_TOPICS,3
+    adminZkClient.createTopic(TEST_TOPICS,1
             ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
   }
 
@@ -161,6 +178,21 @@
       return consumer;
   }
 
+  //consumer props, less important, just for testing?
+  public static Producer<String,String> createProducer() {
+    final Properties props = new Properties();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+            StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+            StringSerializer.class.getName());
+
+    // Create the producer using props.
+    final Producer<String, String> producer =
+            new KafkaProducer<>(props);
+    return producer;
+  }
+
   @Test
   public void endToEndSourceTest() {
     ClientCache client = createGeodeClient();
@@ -179,4 +211,18 @@
     });
   }
 
+  @Test
+  public void endToEndSinkTest() {
+    ClientCache client = createGeodeClient();
+    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+
+    Producer<String, String> producer = createProducer();
+    for (int i = 0; i < 10; i++) {
+      producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
+    }
+
+    int i = 0;
+    await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
+  }
+
 }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index cc8e27b..d6fc7a6 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -1,5 +1,6 @@
 package geode.kafka;
 
+import geode.kafka.sink.GeodeKafkaSink;
 import geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -62,8 +63,20 @@
         herder.putConnectorConfig(
                 sourceProps.get(ConnectorConfig.NAME_CONFIG),
                 sourceProps, true, (error, result)->{
-                    System.out.println("CALLBACK: " + result + "::: error?" + error);
                 });
 
+        Map<String, String> sinkProps = new HashMap<>();
+        sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
+        sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
+        sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        sinkProps.put(REGIONS, TEST_REGIONS);
+        sinkProps.put(TOPICS, TEST_TOPICS);
+
+        herder.putConnectorConfig(
+                sinkProps.get(ConnectorConfig.NAME_CONFIG),
+                sinkProps, true, (error, result)->{
+                });
+
+
     }
 }