end to end test "passes"  Not sure why we dont get all the events or don't get any events until enough polls are called
diff --git a/build.gradle b/build.gradle
index c9500b0..87a2c36 100644
--- a/build.gradle
+++ b/build.gradle
@@ -13,8 +13,8 @@
 
 dependencies {
 
-    compile 'org.apache.geode:geode-core:1.11.0'
-    compile 'org.apache.geode:geode-cq:1.11.0'
+    compile 'org.apache.geode:geode-core:1.10.0'
+    compile 'org.apache.geode:geode-cq:1.10.0'
     compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
     compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
     compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
@@ -27,4 +27,7 @@
 
     testCompile group: 'junit', name: 'junit', version: '4.12'
 
+    testImplementation 'org.awaitility:awaitility:4.0.2'
+
+
 }
diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/kafka/GeodeKafkaSource.java
index 8b88e81..95afc50 100644
--- a/src/main/java/kafka/GeodeKafkaSource.java
+++ b/src/main/java/kafka/GeodeKafkaSource.java
@@ -10,11 +10,23 @@
 import java.util.List;
 import java.util.Map;
 
-public class GeodeKafkaSource extends SourceConnector {
+import static kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static kafka.GeodeConnectorConfig.LOCATORS;
+import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.REGIONS;
+import static kafka.GeodeConnectorConfig.TOPICS;
+import static kafka.GeodeKafkaSourceTask.TASK_ID;
 
-  public static String REGION_NAME = "GEODE_REGION_NAME";
-  private String regionName;
-  private static String TOPICS = "TOPICS";
+public class GeodeKafkaSource extends SourceConnector {
 
   private Map<String, String> sharedProps;
   private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -34,9 +46,11 @@
     taskProps.putAll(sharedProps);
 
     // use the same props for all tasks at the moment
-    for (int i = 0; i < maxTasks; i++)
+    for (int i = 0; i < maxTasks; i++) {
+    //TODO partition regions and topics
+      taskProps.put(TASK_ID, "" + i);
       taskConfigs.add(taskProps);
-
+    }
     return taskConfigs;
   }
 
@@ -48,7 +62,17 @@
 
   @Override
   public void start(Map<String, String> props) {
-    sharedProps = props;
+    sharedProps = computeMissingConfigurations(props);
+  }
+
+  private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+    props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+    props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
+    props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
+    props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
+    props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
+    props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+    return props;
   }
 
   @Override
@@ -60,4 +84,8 @@
   public String version() {
     return AppInfoParser.getVersion();
   }
+
+  public Map<String, String> getSharedProps() {
+    return sharedProps;
+  }
 }
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index 9608f07..c463e6f 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -4,134 +4,196 @@
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.REGION_NAME;
 
 public class GeodeKafkaSourceTask extends SourceTask {
-  private static String REGION_NAME = "REGION_NAME";
-  private static String OFFSET = "OFFSET";
-  private static String topics[];
-  private int batchSize;
-  private int queueSize;
-  private static BlockingQueue<CqEvent> eventBuffer;
-  private Map<String, String> sourcePartition;
-  private Map<String, Long> offset;
 
-  private ClientCache clientCache;
+    //property string to pass in to identify task
+    public static final String TASK_ID = "GEODE_TASK_ID";
+    private static final String TASK_PREFIX = "TASK";
+    private static final String DOT = ".";
+    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
-  @Override
-  public String version() {
-    return null;
-  }
+    private int taskId;
+    private ClientCache clientCache;
+    private List<String> regionNames;
+    private List<String> topics;
+    private Map<String, Map<String, String>> sourcePartitions;
+    private static BlockingQueue<GeodeEvent> eventBuffer;
+    private int batchSize;
 
-  @Override
-  public void start(Map<String, String> props) {
-    System.out.println("JASON task start");
-    batchSize = 100;
-    queueSize = 100000;
-    String regionName = "someRegion";
-    eventBuffer = new LinkedBlockingQueue<>(queueSize);
-    topics = new String[] {"someTopic"};
-    sourcePartition = new HashMap<>();
-    sourcePartition.put(REGION_NAME, regionName);
 
-    offset = new HashMap<>();
-    offset.put("OFFSET", 0L);
-
-    installOnGeode("localHost", 10334, "someRegion");
-    System.out.println("JASON task start end");
-  }
-
-  @Override
-  public List<SourceRecord> poll() throws InterruptedException {
-//    System.out.println("JASON polling");
-    ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
-    ArrayList<CqEvent> events = new ArrayList<>(batchSize);
-    if (eventBuffer.drainTo(events, batchSize) > 0) {
-      for (CqEvent event : events) {
-
-        for (String topic : topics)
-          records.add(new SourceRecord(sourcePartition, offset, topic, null, event));
-      }
-
-      System.out.println("JASON we polled and returning records" + records.size());
-      return records;
+    private static Map<String, Long> createOffset() {
+        Map<String, Long> offset = new HashMap<>();
+        offset.put("OFFSET", 0L);
+        return offset;
     }
 
-//    System.out.println("JASON we didn't poll any records");
-    return null;
-  }
-
-  @Override
-  public void stop() {
-    clientCache.close(true);
-  }
-
-  private void installOnGeode(String locatorHost, int locatorPort, String regionName) {
-    clientCache = new ClientCacheFactory().set("durable-client-id", "someClient")
-        .set("durable-client-timeout", "200")
-        .setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create();
-    CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
-    cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener());
-    System.out.println("JASON installing on Geode");
-    CqAttributes cqAttributes = cqAttributesFactory.create();
-    try {
-      System.out.println("JASON installing new cq");
-      clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes,
-          true).execute();
-      System.out.println("JASON finished installing cq");
-    } catch (CqExistsException e) {
-      System.out.println("UHH");
-      e.printStackTrace();
-    } catch (CqException | RegionNotFoundException e) {
-      System.out.println("UHH e");
-      e.printStackTrace();
-    }
-    catch (Exception e) {
-      System.out.println("UHHHHHH " + e);
-    }
-    System.out.println("JASON task calling ready for events");
-    clientCache.readyForEvents();
-    System.out.println("JASON task ready for events");
-  }
-
-  private static class GeodeKafkaSourceListener implements CqListener {
-
     @Override
-    public void onEvent(CqEvent aCqEvent) {
-      try {
-        System.out.println("JASON cqEvent and putting into eventBuffer");
-        eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
+    public String version() {
+        return null;
+    }
 
-        while (true) {
-          try {
-            if (!eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS))
-              break;
-          } catch (InterruptedException ex) {
-            ex.printStackTrace();
-          }
-          System.out.println("GeodeKafkaSource Queue is full");
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            System.out.println("JASON task start");
+            taskId = Integer.parseInt(props.get(TASK_ID));
+            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+            int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
+            eventBuffer = new LinkedBlockingQueue<>(queueSize);
+
+            //grouping will be done in the source and not the task
+            regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
+            topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
+            sourcePartitions = createSourcePartitionsMap(regionNames);
+
+            String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
+            if (!durableClientId.equals("")) {
+                durableClientId += taskId;
+            }
+            System.out.println("JASON durable client id is:" + durableClientId);
+            String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
+            String cqPrefix = props.get(CQ_PREFIX);
+
+            List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
+            installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
+            System.out.println("JASON task start finished");
         }
-      }
+        catch (Exception e) {
+            System.out.println("Exception:" + e);
+            e.printStackTrace();
+            throw e;
+        }
     }
 
     @Override
-    public void onError(CqEvent aCqEvent) {
+    public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+        if (eventBuffer.drainTo(events, batchSize) > 0) {
+            for (GeodeEvent event : events) {
+                for (String topic : topics) {
+                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+//                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
+                }
+            }
 
+            return records;
+        }
+
+        return null;
     }
-  }
+
+    @Override
+    public void stop() {
+        clientCache.close(true);
+    }
+
+    ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+        ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
+                .set("durable-client-timeout", durableClientTimeOut)
+                .setPoolSubscriptionEnabled(true);
+        for (LocatorHostPort locator: locators) {
+          ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+        }
+        return ccf.create();
+    }
+
+    void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
+      boolean isDurable = isDurable(durableClientId);
+
+      clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
+        for (String region : regionNames) {
+            installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
+        }
+        if (isDurable) {
+          clientCache.readyForEvents();
+        }
+    }
+
+    void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
+        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+        cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
+        System.out.println("JASON installing on Geode");
+        CqAttributes cqAttributes = cqAttributesFactory.create();
+        try {
+            System.out.println("JASON installing new cq");
+            clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+                    isDurable).execute();
+            System.out.println("JASON finished installing cq");
+        } catch (CqExistsException e) {
+            System.out.println("UHH");
+            e.printStackTrace();
+        } catch (CqException | RegionNotFoundException e) {
+            System.out.println("UHH e");
+            e.printStackTrace();
+        } catch (Exception e) {
+            System.out.println("UHHHHHH " + e);
+        }
+    }
+
+
+  List<String> parseNames(String names) {
+        return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+    }
+
+    List<LocatorHostPort> parseLocators(String locators) {
+        return Arrays.stream(locators.split(",")).map((s) -> {
+            String locatorString = s.trim();
+            return parseLocator(locatorString);
+        }).collect(Collectors.toList());
+    }
+
+    private LocatorHostPort parseLocator(String locatorString) {
+        String[] splits = locatorString.split("\\[");
+        String locator = splits[0];
+        int port = Integer.parseInt(splits[1].replace("]", ""));
+        return new LocatorHostPort(locator, port);
+    }
+
+    boolean isDurable(String durableClientId) {
+      return !durableClientId.equals("");
+    }
+
+    String generateCqName(int taskId, String cqPrefix, String regionName) {
+      return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+    }
+
+    /**
+     * converts a list of regions names into a map of source partitions
+     *
+     * @param regionNames list of regionNames
+     * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+     */
+    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+        return regionNames.stream().map(regionName -> {
+            Map<String, String> sourcePartition = new HashMap<>();
+            sourcePartition.put(REGION_NAME, regionName);
+            return sourcePartition;
+        }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+    }
 }
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index 2b0293f..85c0fc4 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -1,11 +1,8 @@
 package kafka;
 
-import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
@@ -16,20 +13,9 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
-import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
-import org.apache.kafka.connect.storage.StringConverter;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -40,10 +26,11 @@
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
 
 public class GeodeKafkaTestCluster {
 
@@ -51,6 +38,9 @@
   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
   private static boolean debug = true;
 
+  public static String TEST_TOPICS = "someTopic";
+  public static String TEST_REGIONS = "someRegion";
+
   private static ZooKeeperLocalCluster zooKeeperLocalCluster;
   private static KafkaLocalCluster kafkaLocalCluster;
   private static GeodeLocalCluster geodeLocalCluster;
@@ -63,7 +53,7 @@
     startKafka();
     startGeode();
     createTopic();
-    Thread.sleep(5000);
+
     startWorker();
     consumer = createConsumer();
     Thread.sleep(5000);
@@ -74,9 +64,8 @@
     workerAndHerderCluster.stop();
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
             15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
-
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.deleteTopic("someTopic");
+    adminZkClient.deleteTopic(TEST_TOPICS);
 
     kafkaLocalCluster.stop();
     geodeLocalCluster.stop();
@@ -92,8 +81,12 @@
   private static void createTopic() {
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
             15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+
+    Properties topicProperties = new Properties();
+    topicProperties.put("flush.messages", "1");
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+    adminZkClient.createTopic(TEST_TOPICS,3
+            ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
   }
 
   private ClientCache createGeodeClient() {
@@ -125,15 +118,15 @@
 
 
   private static Properties getKafkaConfig() throws IOException {
-    int BROKER_PORT = 8888;
+    int BROKER_PORT = 9092;
     Properties props = new Properties();
 
     props.put("broker.id", "0");
+    props.put("log4j.configuration", "/Users/jhuynh/Pivotal/kafka/config/connect-log4j.properties");
     props.put("zookeeper.connect", "localhost:2181");
     props.put("host.name", "localHost");
     props.put("port", BROKER_PORT);
     props.put("offsets.topic.replication.factor", "1");
-    props.put("log.dir", (debug)? "/tmp/kafka" : temporaryFolder.newFolder("kafka").getAbsolutePath());
     props.put("log.flush.interval.messages", "1");
     props.put("log.flush.interval.ms", "10");
 
@@ -144,22 +137,6 @@
     props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
 
     //Specifically GeodeKafka connector configs
-
-
-
-/*
-name=file-source
-# The class implementing the connector
-connector.class=FileStreamSource
-# Maximum number of tasks to run for this connector instance
-tasks.max=1
-# The input file (path relative to worker's working directory)
-# This is the only setting specific to the FileStreamSource
-file=test.txt
-# The output topic in Kafka
-topic=connect-test
- */
-
     return props;
   }
 
@@ -167,37 +144,38 @@
   //consumer props, less important, just for testing?
   public static Consumer<String,String> createConsumer() {
       final Properties props = new Properties();
-    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(ConsumerConfig.GROUP_ID_CONFIG,
               "myGroup");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
               StringDeserializer.class.getName());
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
               StringDeserializer.class.getName());
-      // Create the consumer using props.
+
+    // Create the consumer using props.
       final Consumer<String, String> consumer =
               new KafkaConsumer<>(props);
       // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList("someTopic"));
+      consumer.subscribe(Collections.singletonList(TEST_TOPICS));
       return consumer;
   }
 
   @Test
-  public void testX() throws InterruptedException {
+  public void endToEndSourceTest() {
     ClientCache client = createGeodeClient();
-    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion");
-    region.put("JASON KEY", "JASON VALUE");
-    System.out.println("PUT COMPLETE!");
-    region.get("JASON KEY");
-//    client.close();
+    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
 
-
-    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
-    for (ConsumerRecord<String, String> record: records) {
-      System.out.println("JASON we consumed a record:" + record);
-    }
-    System.out.println("TEST COMPLETE!");
-
+    //right now just verify something makes it end to end
+    AtomicInteger valueReceived = new AtomicInteger(0);
+    await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      region.put("KEY", "VALUE" + System.currentTimeMillis());
+      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4));
+      for (ConsumerRecord<String, String> record: records) {
+//        System.out.println("WE consumed a record:" + record);
+        valueReceived.incrementAndGet();
+      }
+      return valueReceived.get() > 0;
+    });
   }
 
 }
diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/kafka/LocatorLauncherWrapper.java
index b4340c4..c1a7075 100644
--- a/src/test/java/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/kafka/LocatorLauncherWrapper.java
@@ -16,7 +16,8 @@
 //        String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
 //        properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
         properties.setProperty(ConfigurationProperties.NAME, "locator1");
-        Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/"), properties);
+
+        Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
         while (true) {
 
         }
diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/kafka/ServerLauncherWrapper.java
index 7493d6b..933824e 100644
--- a/src/test/java/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/kafka/ServerLauncherWrapper.java
@@ -37,7 +37,7 @@
                 .set(ConfigurationProperties.LOCATORS, locatorString)
                 .set(ConfigurationProperties.NAME,
                         "server-1")
-                .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/")
+                .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
                 .set(ConfigurationProperties.LOG_LEVEL, "info")
 //               .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
                 .create();