Worker and herder added
Source taskes being instantiated
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index e6aa578..1f74700 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -42,7 +42,7 @@
     queueSize = 100000;
     String regionName = "someRegion";
     eventBuffer = new LinkedBlockingQueue<>(queueSize);
-    topics = new String[] {"default"};
+    topics = new String[] {"myTopic"};
     sourcePartition = new HashMap<>();
     sourcePartition.put(REGION_NAME, regionName);
 
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index 824b148..cb5f9ad 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -1,10 +1,29 @@
 package kafka;
 
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -13,6 +32,8 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 public class GeodeKafkaTestCluster {
@@ -30,14 +51,73 @@
     startZooKeeper();
     startKafka();
     startGeode();
+    createTopic();
+    startWorker();
   }
 
   @AfterClass
   public static void shutdown() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
+            15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+    adminZkClient.deleteTopic("someTopic");
     kafkaLocalCluster.stop();
     geodeLocalCluster.stop();
   }
 
+
+  private static void startWorker() {
+    Map props = new HashMap();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+    props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+    // fast flushing for testing.
+    props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+
+    props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+    props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+    props.put("internal.key.converter.schemas.enable", "false");
+    props.put("internal.value.converter.schemas.enable", "false");
+    props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+    props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+    props.put("key.converter.schemas.enable", "false");
+    props.put("value.converter.schemas.enable", "false");
+
+
+    WorkerConfig workerCfg = new StandaloneConfig(props);
+
+    MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+    offBackingStore.configure(workerCfg);
+
+    Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
+    worker.start();
+
+    Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
+    herder.start();
+
+
+
+
+    Map<String, String> sourceProps = new HashMap<>();
+    sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
+    sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
+    sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+//
+    herder.putConnectorConfig(
+            sourceProps.get(ConnectorConfig.NAME_CONFIG),
+            sourceProps, true, (error, result)->{
+              System.out.println("CALLBACK: " + result);
+            });
+    System.out.println("Worker and herder started");
+  }
+
+  private static void createTopic() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
+            15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+    adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+  }
+
   private ClientCache createGeodeClient() {
     return new ClientCacheFactory().addPoolLocator("localhost", 10334).create();
   }
@@ -87,17 +167,7 @@
 
     //Specifically GeodeKafka connector configs
 
-    /*
-    props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
-        props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
-        props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
-        props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
-        props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
-        props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
-        props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
-        props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
-     */
+
 
 /*
 name=file-source