geode as source to kafka hack test successful
diff --git a/build.gradle b/build.gradle
index 9e23d87..c9500b0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,6 +14,7 @@
 dependencies {
 
     compile 'org.apache.geode:geode-core:1.11.0'
+    compile 'org.apache.geode:geode-cq:1.11.0'
     compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
     compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
     compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index 1f74700..9608f07 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -8,6 +8,7 @@
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 
@@ -38,11 +39,12 @@
 
   @Override
   public void start(Map<String, String> props) {
+    System.out.println("JASON task start");
     batchSize = 100;
     queueSize = 100000;
     String regionName = "someRegion";
     eventBuffer = new LinkedBlockingQueue<>(queueSize);
-    topics = new String[] {"myTopic"};
+    topics = new String[] {"someTopic"};
     sourcePartition = new HashMap<>();
     sourcePartition.put(REGION_NAME, regionName);
 
@@ -50,10 +52,12 @@
     offset.put("OFFSET", 0L);
 
     installOnGeode("localHost", 10334, "someRegion");
+    System.out.println("JASON task start end");
   }
 
   @Override
   public List<SourceRecord> poll() throws InterruptedException {
+//    System.out.println("JASON polling");
     ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
     ArrayList<CqEvent> events = new ArrayList<>(batchSize);
     if (eventBuffer.drainTo(events, batchSize) > 0) {
@@ -63,9 +67,11 @@
           records.add(new SourceRecord(sourcePartition, offset, topic, null, event));
       }
 
+      System.out.println("JASON we polled and returning records" + records.size());
       return records;
     }
 
+//    System.out.println("JASON we didn't poll any records");
     return null;
   }
 
@@ -80,16 +86,26 @@
         .setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create();
     CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
     cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener());
+    System.out.println("JASON installing on Geode");
     CqAttributes cqAttributes = cqAttributesFactory.create();
     try {
+      System.out.println("JASON installing new cq");
       clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes,
-          true);
+          true).execute();
+      System.out.println("JASON finished installing cq");
     } catch (CqExistsException e) {
+      System.out.println("UHH");
       e.printStackTrace();
-    } catch (CqException e) {
+    } catch (CqException | RegionNotFoundException e) {
+      System.out.println("UHH e");
       e.printStackTrace();
     }
+    catch (Exception e) {
+      System.out.println("UHHHHHH " + e);
+    }
+    System.out.println("JASON task calling ready for events");
     clientCache.readyForEvents();
+    System.out.println("JASON task ready for events");
   }
 
   private static class GeodeKafkaSourceListener implements CqListener {
@@ -97,6 +113,7 @@
     @Override
     public void onEvent(CqEvent aCqEvent) {
       try {
+        System.out.println("JASON cqEvent and putting into eventBuffer");
         eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
 
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index cb5f9ad..2b0293f 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -10,6 +10,12 @@
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -32,7 +38,10 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
@@ -45,6 +54,8 @@
   private static ZooKeeperLocalCluster zooKeeperLocalCluster;
   private static KafkaLocalCluster kafkaLocalCluster;
   private static GeodeLocalCluster geodeLocalCluster;
+  private static WorkerAndHerderCluster workerAndHerderCluster;
+  private static Consumer<String, String> consumer;
 
   @BeforeClass
   public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
@@ -52,63 +63,30 @@
     startKafka();
     startGeode();
     createTopic();
+    Thread.sleep(5000);
     startWorker();
+    consumer = createConsumer();
+    Thread.sleep(5000);
   }
 
   @AfterClass
   public static void shutdown() {
+    workerAndHerderCluster.stop();
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
             15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
     adminZkClient.deleteTopic("someTopic");
+
     kafkaLocalCluster.stop();
     geodeLocalCluster.stop();
   }
 
 
-  private static void startWorker() {
-    Map props = new HashMap();
-    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
-    props.put("offset.storage.file.filename", "/tmp/connect.offsets");
-    // fast flushing for testing.
-    props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
-
-
-    props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-    props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-    props.put("internal.key.converter.schemas.enable", "false");
-    props.put("internal.value.converter.schemas.enable", "false");
-    props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-    props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-    props.put("key.converter.schemas.enable", "false");
-    props.put("value.converter.schemas.enable", "false");
-
-
-    WorkerConfig workerCfg = new StandaloneConfig(props);
-
-    MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
-    offBackingStore.configure(workerCfg);
-
-    Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
-    worker.start();
-
-    Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
-    herder.start();
-
-
-
-
-    Map<String, String> sourceProps = new HashMap<>();
-    sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
-    sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
-    sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-//
-    herder.putConnectorConfig(
-            sourceProps.get(ConnectorConfig.NAME_CONFIG),
-            sourceProps, true, (error, result)->{
-              System.out.println("CALLBACK: " + result);
-            });
-    System.out.println("Worker and herder started");
+  private static void startWorker() throws IOException, InterruptedException {
+    workerAndHerderCluster = new WorkerAndHerderCluster();
+    workerAndHerderCluster.start();
+    Thread.sleep(20000);
   }
 
   private static void createTopic() {
@@ -186,13 +164,40 @@
   }
 
 
+  //consumer props, less important, just for testing?
+  public static Consumer<String,String> createConsumer() {
+      final Properties props = new Properties();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG,
+              "myGroup");
+      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+              StringDeserializer.class.getName());
+      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+              StringDeserializer.class.getName());
+      // Create the consumer using props.
+      final Consumer<String, String> consumer =
+              new KafkaConsumer<>(props);
+      // Subscribe to the topic.
+      consumer.subscribe(Collections.singletonList("someTopic"));
+      return consumer;
+  }
 
   @Test
   public void testX() throws InterruptedException {
     ClientCache client = createGeodeClient();
     Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion");
     region.put("JASON KEY", "JASON VALUE");
+    System.out.println("PUT COMPLETE!");
+    region.get("JASON KEY");
+//    client.close();
+
+
+    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
+    for (ConsumerRecord<String, String> record: records) {
+      System.out.println("JASON we consumed a record:" + record);
+    }
     System.out.println("TEST COMPLETE!");
+
   }
 
 }
diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/kafka/GeodeLocalCluster.java
index fd72dec..43ac8f5 100644
--- a/src/test/java/kafka/GeodeLocalCluster.java
+++ b/src/test/java/kafka/GeodeLocalCluster.java
@@ -13,12 +13,12 @@
     }
 
     public void start() throws IOException, InterruptedException {
+        System.out.println("starting locator");
         locatorProcess.exec("10334");
-        Thread.sleep(30000);
+        Thread.sleep(15000);
         System.out.println("is alive?" + locatorProcess.process.isAlive());
         serverProcess.exec("40404");
         Thread.sleep(30000);
-
     }
 
     public void stop() {